AWS announces preview of AWS Interconnect - multicloud
AWS announces AWS Interconnect – multicloud (preview), providing simple, resilient, high-speed private connections to other cloud service providers. AWS Interconnect - multicloud is easy to configure and provides high-speed, resilient connectivity with dedicated bandwidth, enabling customers to interconnect AWS networking services such as AWS Transit Gateway, AWS Cloud WAN, and Amazon VPC to other cloud service providers with ease.
Automate Customer Segmentation in Amazon Connect: Complete CSV Upload and API Guide
Step-by-step guide for AWS administrators to programmatically upload CSV data and create customer segments in Amazon Connect Customer Profiles. Covers the complete 5-step API workflow with CLI commands, Python code, field mapping, file encryption, and segment creation. Includes real examples, troubleshooting, and best practices for automated campaign management and customer targeting.
Introduction
Amazon Connect Customer Profiles enables administrators to create targeted customer segments by uploading CSV data programmatically. This comprehensive guide demonstrates the complete 5-step API workflow for importing data and creating segments for campaign management.
Understanding the 5-Step API Workflow
The Complete Process
The workflow involves these sequential API calls with specific purposes:
1. create-upload-job → Creates upload job with field mappings 2. get-upload-job-path → Retrieves pre-signed S3 URL for secure file upload 3. File Upload → Uploads CSV to encrypted S3 storage 4. start-upload-job → Initiates data processing and profile creation 5. get-upload-job → Monitors job status and completion 6. create-segment-definition → Creates segments using uploaded profile data
Data Flow Architecture
Architecture Components:
Step 1-2: Job Creation & Path Generation
- Upload job metadata is created with field mappings
- Pre-signed S3 URL is generated with encryption parameters
- URL expires in 15 minutes for security
Step 3: Secure File Upload
- CSV files are uploaded to encrypted S3 storage using AES256
- Client-side encryption headers ensure data security
- MD5 hash validation prevents data corruption
Step 4-5: Data Processing
- Customer Profiles processes CSV and maps data to profile fields
- Records are created/updated based on unique key
- Job status transitions: CREATED → IN_PROGRESS → SUCCEEDED/FAILED
Step 6: Segment Creation
- Segments filter customers based on uploaded profile data
- Multiple segment criteria can be combined
- Segments become available for campaign targeting
Prerequisites
Required IAM Permissions
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "profile:CreateUploadJob", "profile:GetUploadJobPath", "profile:StartUploadJob", "profile:GetUploadJob", "profile:CreateSegmentDefinition", "profile:ListSegmentDefinitions" ], "Resource": "*" } ] }
Environment Setup
# Configure AWS CLI with appropriate region aws configure set region us-east-1 aws configure set output json # Verify Customer Profiles domain exists aws customer-profiles list-domains
Step-by-Step Implementation
Step 1: Create Upload Job
Purpose: Define how CSV columns map to Customer Profiles fields and create job metadata.
Command:
aws customer-profiles create-upload-job \ --domain-name "<DOMAIN_NAME>" \ --display-name "asset-data-import" \ --fields '{ "asset_id": { "Source": "asset_id", "Target": "_profile.AccountNumber", "ContentType": "STRING" }, "account_number": { "Source": "account_number", "Target": "_profile.BusinessName", "ContentType": "STRING" }, "asset_name": { "Source": "asset_name", "Target": "_profile.AdditionalInformation", "ContentType": "STRING" }, "price": { "Source": "price", "Target": "_profile.PartyTypeString", "ContentType": "STRING" } }' \ --unique-key "asset_id" \ --data-expiry 90
Parameter Explanations:
--domain-name: The unique Customer Profiles domain identifier (1-64 chars, alphanumeric/hyphens/underscores)--display-name: Human-readable job name for identification (1-255 characters)--fields: JSON object mapping CSV columns to profile attributes:Source: Exact CSV column header name (must match CSV file)Target: Valid Customer Profiles field path (see valid fields below)ContentType: Data type validation (STRING, NUMBER, PHONE_NUMBER, EMAIL_ADDRESS, NAME)
--unique-key: CSV column used for record deduplication (prevents duplicates)--data-expiry: Profile retention period in days (1-1098, default: 14 days)
Valid Profile Target Fields:
# Basic Profile Fields _profile.AccountNumber # Account/Customer ID _profile.BusinessName # Company/Business name _profile.FirstName # First name _profile.LastName # Last name _profile.MiddleName # Middle name _profile.EmailAddress # Primary email _profile.PersonalEmailAddress # Personal email _profile.BusinessEmailAddress # Business email _profile.PhoneNumber # Primary phone _profile.MobilePhoneNumber # Mobile phone _profile.HomePhoneNumber # Home phone _profile.BusinessPhoneNumber # Business phone _profile.AdditionalInformation # Custom text data _profile.PartyTypeString # Party type as string _profile.GenderString # Gender as string _profile.BirthDate # Date of birth # Address Fields (nested objects) _profile.Address.City # City _profile.Address.Country # Country _profile.Address.State # State/Province _profile.Address.PostalCode # ZIP/Postal code
Output:
{ "JobId": "<JOB_ID>" }
Output Explanation:
JobId: Unique identifier for the upload job (32-character hex string)- This ID is used in all subsequent API calls for this upload
Step 2: Get Upload Job Path
Purpose: Retrieve pre-signed S3 URL and encryption parameters for secure file upload.
Command:
aws customer-profiles get-upload-job-path \ --domain-name "<DOMAIN_NAME>" \ --job-id "<JOB_ID>"
Parameter Explanations:
--domain-name: Same domain name from Step 1--job-id: Job ID returned from create-upload-job
Output:
{ "Url": "<PRE_SIGNED_S3_URL>", "ClientToken": "<BASE64_ENCRYPTION_KEY>", "ValidUntil": "<EXPIRATION_TIMESTAMP>" }
Output Explanation:
Url: Pre-signed S3 URL for file upload (includes authentication parameters) - masked for securityClientToken: Base64-encoded AES256 encryption key for server-side encryption - masked for securityValidUntil: URL expiration timestamp (15 minutes from generation) - masked for security
Step 3: Upload CSV File with Encryption
Purpose: Securely upload CSV file to S3 with proper encryption headers.
Python Implementation with Detailed Explanations:
import base64 import hashlib import requests import subprocess import json # Step 3a: Create CSV file with proper structure # CSV headers MUST exactly match the "Source" values from Step 1 with open('data.csv', 'w') as f: # Header row - must match field mapping sources exactly f.write('asset_id,account_number,asset_name,price\n') # Data rows - ensure unique values for unique_key column (asset_id) f.write('ASSET001,ACC123,Laptop,1200\n') f.write('ASSET002,ACC456,Server,5000\n') f.write('ASSET003,ACC789,Monitor,300\n') # Step 3b: Get fresh upload path (URLs expire in 15 minutes) cmd = ['aws', 'customer-profiles', 'get-upload-job-path', '--domain-name', '<DOMAIN_NAME>', '--job-id', '<JOB_ID>'] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error getting upload path: {result.stderr}") exit(1) response = json.loads(result.stdout) url = response['Url'] # Pre-signed S3 URL (masked for security) client_token = response['ClientToken'] # Base64 encryption key (masked) # Step 3c: Calculate MD5 hash for encryption validation # This is CRITICAL - AWS requires MD5 of the decoded client token decoded_token = base64.b64decode(client_token) # Decode base64 to bytes md5_hash = hashlib.md5(decoded_token).digest() # Calculate MD5 of bytes md5_b64 = base64.b64encode(md5_hash).decode('utf-8') # Encode back to base64 print(f"Calculated MD5: {md5_b64}") # Step 3d: Prepare encryption headers (required by AWS) headers = { # Specify AES256 encryption algorithm 'x-amz-server-side-encryption-customer-algorithm': 'AES256', # Provide the encryption key 'x-amz-server-side-encryption-customer-key': client_token, # Provide MD5 hash for key validation 'x-amz-server-side-encryption-customer-key-md5': md5_b64 } # Step 3e: Upload file using PUT request with open('data.csv', 'rb') as f: response = requests.put(url, headers=headers, data=f) print(f"Upload Status: {response.status_code}") if response.status_code in [200, 204]: print("✓ File uploaded successfully!") else: print(f"✗ Upload failed: {response.text}") exit(1)
Code Explanation:
- CSV Creation: Headers must exactly match
Sourcefield names from Step 1 - Fresh URL Retrieval: Pre-signed URLs expire, so get fresh one before upload
- MD5 Calculation: AWS requires MD5 hash of decoded ClientToken for validation
- Encryption Headers: Three required headers for server-side encryption
- File Upload: Binary mode upload using PUT request to pre-signed URL
Expected Output:
Upload Status: 200
✓ File uploaded successfully!
Step 4: Start Upload Job Processing
Purpose: Initiate processing of uploaded CSV data into Customer Profiles.
Command:
aws customer-profiles start-upload-job \ --domain-name "<DOMAIN_NAME>" \ --job-id "<JOB_ID>"
Parameter Explanations:
--domain-name: Customer Profiles domain name--job-id: Job ID from Step 1
Response: HTTP 200 with empty body (indicates successful initiation)
Step 5: Monitor Job Status
Purpose: Track processing progress and wait for completion.
Command:
aws customer-profiles get-upload-job \ --domain-name "<DOMAIN_NAME>" \ --job-id "<JOB_ID>"
Output During Processing:
{ "JobId": "<JOB_ID>", "DisplayName": "asset-data-import", "Status": "IN_PROGRESS", "CreatedAt": "<TIMESTAMP>", "Fields": { "asset_id": { "Source": "asset_id", "Target": "_profile.AccountNumber", "ContentType": "STRING" } }, "UniqueKey": "asset_id", "DataExpiry": 90 }
Final Output (Success):
{ "JobId": "<JOB_ID>", "DisplayName": "asset-data-import", "Status": "SUCCEEDED", "CreatedAt": "<TIMESTAMP>", "CompletedAt": "<TIMESTAMP>", "Fields": { "account_number": { "Source": "account_number", "Target": "_profile.BusinessName", "ContentType": "STRING" }, "asset_id": { "Source": "asset_id", "Target": "_profile.AccountNumber", "ContentType": "STRING" }, "asset_name": { "Source": "asset_name", "Target": "_profile.AdditionalInformation", "ContentType": "STRING" }, "price": { "Source": "price", "Target": "_profile.PartyTypeString", "ContentType": "STRING" } }, "UniqueKey": "asset_id", "ResultsSummary": { "UpdatedRecords": 0, "CreatedRecords": 3, "FailedRecords": 0 }, "DataExpiry": 90 }
Status Values Explained:
CREATED: Job created but not startedIN_PROGRESS: Processing uploaded dataSUCCEEDED: All records processed successfullyPARTIALLY_SUCCEEDED: Some records failedFAILED: Job failed completely
ResultsSummary Explanation:
CreatedRecords: New profiles createdUpdatedRecords: Existing profiles updatedFailedRecords: Records that couldn't be processed
Step 6: Create Segment Definitions
Purpose: Create customer segments using uploaded profile data for targeting.
Segment Definition Parameters
Required Parameters:
--domain-name: Customer Profiles domain name--segment-definition-name: Unique segment identifier (1-64 chars, alphanumeric/hyphens/underscores)--display-name: Human-readable segment name (1-255 characters)--segment-groups: JSON structure defining filtering criteria
SegmentGroups Structure:
{ "Groups": [ // Array of dimension groups { "Dimensions": [ // Filtering criteria { "ProfileAttributes": { // Profile field filters "FieldName": { "DimensionType": "INCLUSIVE", // Filter operation "Values": ["value1", "value2"] // Values to match } } } ], "SourceType": "ALL", // Logical operator for dimensions "Type": "ALL" // Logical operator for profiles } ], "Include": "ALL" // Include/exclude matching profiles }
DimensionType Options:
INCLUSIVE: Include profiles with matching valuesEXCLUSIVE: Exclude profiles with matching valuesCONTAINS: Include profiles where field contains the valueBEGINS_WITH: Include profiles where field starts with valueENDS_WITH: Include profiles where field ends with valueBEFORE/AFTER: For date comparisons (BirthDate)BETWEEN/NOT_BETWEEN: For range queriesGREATER_THAN/LESS_THAN: For numeric comparisons
Logical Operators:
ALL: All conditions must be met (AND logic)ANY: Any condition can be met (OR logic)NONE: No conditions should be met (NOT logic)
Sample Segment Definitions
1. Asset Owners Segment (by AccountNumber field):
aws customer-profiles create-segment-definition \ --domain-name "<DOMAIN_NAME>" \ --segment-definition-name "asset-owners-segment" \ --display-name "Asset Owners" \ --description "Customers who own specific assets" \ --segment-groups '{ "Groups": [{ "Dimensions": [{ "ProfileAttributes": { "AccountNumber": { "DimensionType": "INCLUSIVE", "Values": ["ASSET001", "ASSET002", "ASSET003"] } } }], "SourceType": "ALL", "Type": "ALL" }], "Include": "ALL" }'
Explanation: Creates segment including customers whose AccountNumber field matches any of the specified asset IDs.
2. High-Value Customers (by PartyTypeString field):
aws customer-profiles create-segment-definition \ --domain-name "<DOMAIN_NAME>" \ --segment-definition-name "high-value-customers" \ --display-name "High Value Customers" \ --description "Customers with assets over $1000" \ --segment-groups '{ "Groups": [{ "Dimensions": [{ "ProfileAttributes": { "PartyTypeString": { "DimensionType": "CONTAINS", "Values": ["5000", "1200"] } } }], "SourceType": "ALL", "Type": "ALL" }], "Include": "ALL" }'
Explanation: Creates segment including customers whose PartyTypeString field contains high-value amounts (using CONTAINS for partial matching).
3. Laptop Owners (by AdditionalInformation field):
aws customer-profiles create-segment-definition \ --domain-name "<DOMAIN_NAME>" \ --segment-definition-name "laptop-owners" \ --display-name "Laptop Owners" \ --description "Customers who own laptops" \ --segment-groups '{ "Groups": [{ "Dimensions": [{ "ProfileAttributes": { "AdditionalInformation": { "DimensionType": "CONTAINS", "Values": ["Laptop"] } } }], "SourceType": "ALL", "Type": "ALL" }], "Include": "ALL" }'
Explanation: Creates segment including customers whose AdditionalInformation field contains "Laptop".
4. Multi-Criteria Segment:
aws customer-profiles create-segment-definition \ --domain-name "<DOMAIN_NAME>" \ --segment-definition-name "premium-business-segment" \ --display-name "Premium Business Segment" \ --description "Business customers with high-value assets" \ --segment-groups '{ "Groups": [{ "Dimensions": [ { "ProfileAttributes": { "BusinessName": { "DimensionType": "INCLUSIVE", "Values": ["ACC456"] } } }, { "ProfileAttributes": { "PartyTypeString": { "DimensionType": "CONTAINS", "Values": ["5000"] } } } ], "SourceType": "ALL", "Type": "ALL" }], "Include": "ALL" }'
Explanation: Creates segment requiring BOTH conditions: specific business name AND high-value asset (ALL logic).
Sample Output:
{ "SegmentDefinitionName": "asset-owners-segment", "DisplayName": "Asset Owners", "Description": "Customers who own specific assets", "CreatedAt": "<TIMESTAMP>", "SegmentDefinitionArn": "arn:aws:customer-profiles:<REGION>:<ACCOUNT_ID>:domains/<DOMAIN_NAME>/segment-definitions/asset-owners-segment" }
Complete Python Implementation
Comprehensive automation class with detailed explanations:
import boto3 import base64 import hashlib import requests import time import json from typing import Dict, Any, List class CustomerProfilesManager: """ Comprehensive class for managing Customer Profiles upload jobs and segments. Handles the complete workflow from CSV upload to segment creation. """ def __init__(self, domain_name: str, region: str = 'us-east-1'): """ Initialize the Customer Profiles manager. Args: domain_name: Customer Profiles domain name region: AWS region (default: us-east-1) """ self.domain_name = domain_name self.region = region self.client = boto3.client('customer-profiles', region_name=region) print(f"Initialized Customer Profiles manager for domain: {domain_name}") def create_upload_job(self, display_name: str, fields: Dict[str, Any], unique_key: str, data_expiry: int = 90) -> str: """ Step 1: Create upload job with field mappings. Args: display_name: Human-readable job name fields: Dictionary mapping CSV columns to profile fields unique_key: Column name for deduplication data_expiry: Profile retention in days (1-1098) Returns: Job ID for subsequent operations """ try: print(f"Creating upload job: {display_name}") response = self.client.create_upload_job( DomainName=self.domain_name, DisplayName=display_name, Fields=fields, UniqueKey=unique_key, DataExpiry=data_expiry ) job_id = response['JobId'] print(f"✓ Upload job created successfully: {job_id}") return job_id except Exception as e: print(f"✗ Failed to create upload job: {e}") raise def upload_file(self, job_id: str, file_path: str) -> bool: """ Steps 2-3: Get upload path and upload file with encryption. Args: job_id: Upload job ID from create_upload_job file_path: Path to CSV file to upload Returns: True if upload successful, False otherwise """ try: print(f"Getting upload path for job: {job_id}") # Step 2: Get pre-signed URL and encryption parameters path_response = self.client.get_upload_job_path( DomainName=self.domain_name, JobId=job_id ) url = path_response['Url'] client_token = path_response['ClientToken'] valid_until = path_response['ValidUntil'] print(f"✓ Got pre-signed URL (expires in 15 minutes)") # Step 3: Calculate MD5 hash for encryption validation # This is critical - AWS validates the MD5 of the decoded token decoded_token = base64.b64decode(client_token) md5_hash = hashlib.md5(decoded_token).digest() md5_b64 = base64.b64encode(md5_hash).decode('utf-8') print(f"Calculated MD5 hash: {md5_b64}") # Prepare encryption headers (all three are required) headers = { 'x-amz-server-side-encryption-customer-algorithm': 'AES256', 'x-amz-server-side-encryption-customer-key': client_token, 'x-amz-server-side-encryption-customer-key-md5': md5_b64 } # Upload file using PUT request print(f"Uploading file: {file_path}") with open(file_path, 'rb') as f: response = requests.put(url, headers=headers, data=f) if response.status_code in [200, 204]: print(f"✓ File uploaded successfully (Status: {response.status_code})") return True else: print(f"✗ Upload failed (Status: {response.status_code}): {response.text}") return False except Exception as e: print(f"✗ Upload failed: {e}") return False def start_and_monitor_job(self, job_id: str, poll_interval: int = 10) -> Dict[str, Any]: """ Steps 4-5: Start job processing and monitor until completion. Args: job_id: Upload job ID poll_interval: Seconds between status checks (default: 10) Returns: Final job status response """ try: print(f"Starting upload job: {job_id}") # Step 4: Start job processing self.client.start_upload_job( DomainName=self.domain_name, JobId=job_id ) print("✓ Job started successfully") # Step 5: Monitor job status with polling print("Monitoring job status...") while True: response = self.client.get_upload_job( DomainName=self.domain_name, JobId=job_id ) status = response['Status'] print(f"Job status: {status}") if status == 'SUCCEEDED': results = response['ResultsSummary'] print(f"✓ Job completed successfully!") print(f" Created records: {results['CreatedRecords']}") print(f" Updated records: {results['UpdatedRecords']}") print(f" Failed records: {results['FailedRecords']}") return response elif status == 'PARTIALLY_SUCCEEDED': results = response['ResultsSummary'] print(f"⚠ Job partially succeeded") print(f" Created records: {results['CreatedRecords']}") print(f" Failed records: {results['FailedRecords']}") return response elif status == 'FAILED': reason = response.get('StatusReason', 'Unknown error') print(f"✗ Job failed: {reason}") raise Exception(f"Upload job failed: {reason}") elif status in ['CREATED', 'IN_PROGRESS']: print(f" Waiting {poll_interval} seconds...") time.sleep(poll_interval) else: print(f"✗ Unknown job status: {status}") raise Exception(f"Unknown job status: {status}") except Exception as e: print(f"✗ Job monitoring failed: {e}") raise def create_segment(self, segment_name: str, display_name: str, description: str, profile_field: str, values: List[str], dimension_type: str = 'INCLUSIVE') -> str: """ Step 6: Create segment definition using uploaded profile data. Args: segment_name: Unique segment identifier display_name: Human-readable segment name description: Segment description profile_field: Profile field to filter on (e.g., 'AccountNumber') values: List of values to match dimension_type: Filter type (INCLUSIVE, EXCLUSIVE, CONTAINS, etc.) Returns: Segment definition ARN """ try: print(f"Creating segment: {display_name}") response = self.client.create_segment_definition( DomainName=self.domain_name, SegmentDefinitionName=segment_name, DisplayName=display_name, Description=description, SegmentGroups={ 'Groups': [{ 'Dimensions': [{ 'ProfileAttributes': { profile_field: { 'DimensionType': dimension_type, 'Values': values } } }], 'SourceType': 'ALL', 'Type': 'ALL' }], 'Include': 'ALL' } ) segment_arn = response['SegmentDefinitionArn'] print(f"✓ Segment created successfully: {segment_arn}") return segment_arn except Exception as e: print(f"✗ Failed to create segment: {e}") raise def list_segments(self) -> List[Dict[str, Any]]: """ List all segment definitions in the domain. Returns: List of segment definitions """ try: response = self.client.list_segment_definitions( DomainName=self.domain_name ) segments = response.get('Items', []) print(f"Found {len(segments)} segments in domain") return segments except Exception as e: print(f"✗ Failed to list segments: {e}") raise # Usage Example with Complete Workflow def main(): """ Complete example demonstrating the entire workflow. """ # Initialize manager manager = CustomerProfilesManager("<DOMAIN_NAME>") # Define field mappings (CSV columns to profile fields) fields = { "asset_id": { "Source": "asset_id", "Target": "_profile.AccountNumber", "ContentType": "STRING" }, "account_number": { "Source": "account_number", "Target": "_profile.BusinessName", "ContentType": "STRING" }, "asset_name": { "Source": "asset_name", "Target": "_profile.AdditionalInformation", "ContentType": "STRING" }, "price": { "Source": "price", "Target": "_profile.PartyTypeString", "ContentType": "STRING" } } try: # Execute complete workflow print("=== Starting Customer Profiles Upload Workflow ===") # Step 1: Create upload job job_id = manager.create_upload_job("Asset Import", fields, "asset_id") # Steps 2-3: Upload file success = manager.upload_file(job_id, "data.csv") if not success: raise Exception("File upload failed") # Steps 4-5: Start and monitor job job_result = manager.start_and_monitor_job(job_id) # Step 6: Create segments based on uploaded data print("\n=== Creating Customer Segments ===") # Create asset owners segment manager.create_segment( "asset-owners-segment", "Asset Owners", "Customers who own assets", "AccountNumber", ["ASSET001", "ASSET002", "ASSET003"] ) # Create high-value customers segment manager.create_segment( "high-value-customers", "High Value Customers", "Customers with expensive assets", "PartyTypeString", ["5000"], "CONTAINS" ) # List all segments segments = manager.list_segments() print(f"\n=== Workflow Complete ===") print(f"Created {job_result['ResultsSummary']['CreatedRecords']} profiles") print(f"Created {len(segments)} segments") except Exception as e: print(f"Workflow failed: {e}") if __name__ == "__main__": main()
Troubleshooting Common Issues
Upload Job Creation Issues
Issue 1: Invalid Target Field Error
BadRequestException: Provided nested target is invalid. Found _profile.AssetName, expected [AccountNumber, BusinessName, FirstName, ...]
Solution: Use only valid Customer Profiles fields
# ✓ Valid targets "Target": "_profile.AccountNumber" "Target": "_profile.BusinessName" "Target": "_profile.AdditionalInformation" # ✗ Invalid targets (custom field names not allowed) "Target": "_profile.AssetName" "Target": "_profile.AssetId" "Target": "_profile.CustomField"
Issue 2: Field Mapping Mismatch
ValidationException: Source field 'asset_name' not found in CSV headers
Solution: Ensure CSV headers exactly match Source field names
# CSV must have headers matching Source values asset_id,account_number,asset_name,price ASSET001,ACC123,Laptop,1200
File Upload Issues
Issue 3: 403 Forbidden Error
ClientError: 403 Forbidden - The request signature we calculated does not match
Solutions:
- URL Expiry: Pre-signed URLs expire in 15 minutes - get fresh URL
- MD5 Calculation: Ensure correct MD5 calculation of decoded ClientToken
- Headers: All three encryption headers must be present and correct
# Correct MD5 calculation decoded_token = base64.b64decode(client_token) # Decode first md5_hash = hashlib.md5(decoded_token).digest() # Hash the bytes md5_b64 = base64.b64encode(md5_hash).decode() # Encode result
Issue 4: File Size Limits
- Maximum file size: 5GB
- Maximum records: 1 million per file
- Use multiple smaller files if needed
Job Processing Issues
Issue 5: Job Stuck in IN_PROGRESS Causes & Solutions:
- CSV Format: Ensure UTF-8 encoding, proper comma separation
- Unique Key: Verify unique key values are actually unique
- Data Types: Check ContentType matches actual data format
- Field Count: Ensure all rows have same number of columns
Issue 6: High Failed Records Count Common Causes:
- Invalid email formats (when using EMAIL_ADDRESS ContentType)
- Invalid phone formats (when using PHONE_NUMBER ContentType)
- Missing required fields
- Data too long for field limits
Debugging Steps:
# Check job details for error information aws customer-profiles get-upload-job \ --domain-name "<DOMAIN_NAME>" \ --job-id "<JOB_ID>" # Look for StatusReason field in failed jobs
Segment Creation Issues
Issue 7: Empty Segment Results Solutions:
- Case Sensitivity: Values are case-sensitive - match exactly
- DimensionType: Use CONTAINS for partial matches, INCLUSIVE for exact
- Field Verification: Confirm uploaded data using get-upload-job
- Value Format: Ensure Values array contains strings, not numbers
Issue 8: Segment Definition Name Conflicts
ConflictException: Segment definition already exists
Solution: Use unique segment names or delete existing segment first
# Delete existing segment aws customer-profiles delete-segment-definition \ --domain-name "<DOMAIN_NAME>" \ --segment-definition-name "existing-segment"
Best Practices
Data Preparation
-
CSV Format Standards:
- Use UTF-8 encoding without BOM
- Ensure consistent column count across all rows
- Remove special characters from headers
- Validate data before upload
-
Field Mapping Strategy:
- Map similar data types (text to STRING, numbers to NUMBER)
- Use descriptive Source names matching CSV headers
- Choose appropriate unique key (customer ID, email, etc.)
- Document field mappings for team reference
-
Data Quality:
- Clean data before upload (remove duplicates, fix formats)
- Validate email/phone formats if using specific ContentTypes
- Ensure unique key values are truly unique
- Test with small dataset first
Security Best Practices
-
IAM Permissions:
- Use least privilege principle
- Create specific roles for Customer Profiles operations
- Enable CloudTrail logging for audit trails
- Rotate access keys regularly
-
Data Protection:
- Use encryption in transit (HTTPS) and at rest (AES256)
- Mask sensitive data in logs and documentation
- Implement proper error handling to avoid data exposure
- Set appropriate data expiry periods
Performance Optimization
-
Upload Efficiency:
- Batch records in single CSV (up to 1M records)
- Use appropriate polling intervals (10-30 seconds)
- Implement exponential backoff for retries
- Monitor upload job metrics
-
Segment Design:
- Start with simple single-criteria segments
- Use CONTAINS for flexible matching
- Combine criteria efficiently (ALL vs ANY logic)
- Test segment performance with large datasets
Monitoring and Maintenance
import logging import boto3 from datetime import datetime, timedelta # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def monitor_upload_jobs(domain_name: str, days_back: int = 7): """ Monitor upload jobs from the last N days. Args: domain_name: Customer Profiles domain days_back: Number of days to look back """ client = boto3.client('customer-profiles') try: # List recent upload jobs (if API supports date filtering) # Note: Actual API may not support date filtering response = client.list_upload_jobs(DomainName=domain_name) jobs = response.get('Items', []) logger.info(f"Found {len(jobs)} upload jobs") # Analyze job success rates succeeded = sum(1 for job in jobs if job.get('Status') == 'SUCCEEDED') failed = sum(1 for job in jobs if job.get('Status') == 'FAILED') logger.info(f"Success rate: {succeeded}/{len(jobs)} ({succeeded/len(jobs)*100:.1f}%)") if failed > 0: logger.warning(f"Found {failed} failed jobs - investigate causes") except Exception as e: logger.error(f"Monitoring failed: {e}") def cleanup_old_segments(domain_name: str, pattern: str = "test-"): """ Clean up test segments (use with caution). Args: domain_name: Customer Profiles domain pattern: Pattern to match for deletion """ client = boto3.client('customer-profiles') try: response = client.list_segment_definitions(DomainName=domain_name) segments = response.get('Items', []) for segment in segments: name = segment['SegmentDefinitionName'] if name.startswith(pattern): logger.info(f"Deleting test segment: {name}") client.delete_segment_definition( DomainName=domain_name, SegmentDefinitionName=name ) except Exception as e: logger.error(f"Cleanup failed: {e}") # Usage if __name__ == "__main__": monitor_upload_jobs("<DOMAIN_NAME>") # cleanup_old_segments("<DOMAIN_NAME>", "test-") # Uncomment with caution
Conclusion
This comprehensive guide provides everything needed to programmatically upload CSV data and create customer segments in Amazon Connect Customer Profiles. The 5-step API workflow, combined with proper error handling and security practices, enables robust automation of customer segmentation for campaign management.
Key takeaways:
- Follow the exact API sequence for reliable results
- Use proper encryption headers for secure file uploads
- Implement comprehensive error handling and monitoring
- Test with small datasets before production deployment
- Document field mappings and maintain data quality standards
Resources
AWS Documentation
- Customer Profiles API Reference
- CreateUploadJob API
- CreateSegmentDefinition API
- Amazon Connect Administrator Guide
CLI References
Relevant content
- asked 4 years ago
AWS OFFICIALUpdated 3 years ago
AWS OFFICIALUpdated 2 years ago