AWS DMS: Replication Instance Fails to Connect to Kinesis Target Endpoint

0

I am working on setting up an AWS CloudFormation stack to deploy a database using Aurora Postgres Serverless V2, some network resources, a Jumpbox, and some AWS DMS resources to use real-time CDC as trigger for downstream services.

The key components of the setup include:

  • Initiation of an Amazon Aurora PostgreSQL database within a Virtual Private Cloud (VPC).

  • Establishing a specific security group for the Aurora PostgreSQL database. This group will be configured to accept connections solely from distinct security groups.

  • Configuring a second security group, defined for database access, that's authorized to interact with the PostgreSQL database. This security group will be granted outbound traffic permissions to the internet.

  • Establishing network communication between the resources residing in the VPC and the internet with NAT gateways and necessary routing rules.

  • Setting up a public subnet along with necessary configuration within the VPC to enable local database connection via a Jumpbox.

  • Implementing AWS Database Migration Service (DMS) resources to enable real-time Change Data Capture (CDC) from the Aurora PostgreSQL database to an Amazon Kinesis Data Stream.

What I am having issues with is the last point in the list: so far, I have correctly set up my replication instance to connect succesfully to my Aurora Postgres DB (Source Endpoint), though, I am not able to connect my replication to Kinesis Data Stream (Target Endpoint) for downstream services triggering.

Following the official AWS documentation found here, here, and here, I've also set up the VPC endpoints for my Kinesis and DMS, as well as created the roles dms-vpc-role and dms-cloudwatch-logs-role with policy, respecitvely, AmazonDMSVPCManagementRole and AmazonDMSCloudWatchLogsRole.

The connection from my replication instance to my Kinesis Data Stream fails with the error: Test Endpoint failed: Application-Status: 1020912, Application-Message: Failed to connect to database.

I am not a network engineer, therefore there might be some details I am missing in my setup and that would fix the replication instance to Kinesis Data Stream connection issue. Seeking help to identify these source of the issue.

Below, you will find my CloudFormation setup:

Parameters:
  StackStage:
    Description: The stage/environment
    Type: String
    Default: ${opt:stage}

Resources:
  MyVPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: "10.0.0.0/16"
      EnableDnsSupport: true
      EnableDnsHostnames: true
      Tags:
        - Key: Name
          Value: MyVPC

  MyInternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: MyInternetGateway

  MyVPCGatewayAttachment:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref MyVPC
      InternetGatewayId: !Ref MyInternetGateway

  MyPrivateSubnet1a:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref MyVPC
      CidrBlock: "10.0.1.0/24"
      AvailabilityZone: !Select [ 0, !GetAZs '' ]
      Tags:
        - Key: Name
          Value: MyPrivateSubnet1a

  MyPrivateSubnet1b:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref MyVPC
      CidrBlock: "10.0.2.0/24"
      AvailabilityZone: !Select [ 1, !GetAZs '' ]
      Tags:
        - Key: Name
          Value: MyPrivateSubnet1b

  MyDBAccessSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: MyDBAccessSecurityGroup
      GroupDescription: Security Group for My DB Access
      VpcId: !Ref MyVPC
      SecurityGroupEgress:
        - IpProtocol: '-1'
          CidrIp: 0.0.0.0/0
      Tags:
        - Key: Name
          Value: MyDBAccessSecurityGroup

  MyDBSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: MyDBSecurityGroup
      GroupDescription: Security Group for MyDB
      VpcId: !Ref MyVPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 5432
          ToPort: 5432
          SourceSecurityGroupId: !GetAtt MyDBAccessSecurityGroup.GroupId
        - IpProtocol: tcp
          FromPort: 5432
          ToPort: 5432
          SourceSecurityGroupId: !GetAtt MyJumpBoxSecurityGroup.GroupId
      Tags:
        - Key: Name
          Value: MyDBSecurityGroup

  MyNATGateway1aEIP:
    Type: AWS::EC2::EIP
    Properties:
      Domain: vpc

  MyNATGateway1bEIP:
    Type: AWS::EC2::EIP
    Properties:
      Domain: vpc

  MyNATGateway1a:
    Type: AWS::EC2::NatGateway
    Properties:
      AllocationId: !GetAtt MyNATGateway1aEIP.AllocationId
      SubnetId: !Ref MyPrivateSubnet1a

  MyNATGateway1b:
    Type: AWS::EC2::NatGateway
    Properties:
      AllocationId: !GetAtt MyNATGateway1bEIP.AllocationId
      SubnetId: !Ref MyPrivateSubnet1b

  MyRouteTable1a:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref MyVPC
      Tags:
        - Key: Name
          Value: MyRouteTable1a

  MyRouteTable1b:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref MyVPC
      Tags:
        - Key: Name
          Value: MyRouteTable1b

  MyDefaultRouteSubnet1a:
    Type: AWS::EC2::Route
    DependsOn: MyNATGateway1a
    Properties:
      RouteTableId: !Ref MyRouteTable1a
      DestinationCidrBlock: 0.0.0.0/0
      NatGatewayId: !Ref MyNATGateway1a

  MyDefaultRouteSubnet1b:
    Type: AWS::EC2::Route
    DependsOn: MyNATGateway1b
    Properties:
      RouteTableId: !Ref MyRouteTable1b
      DestinationCidrBlock: 0.0.0.0/0
      NatGatewayId: !Ref MyNATGateway1b

  MyPrivateSubnet1aRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref MyPrivateSubnet1a
      RouteTableId: !Ref MyRouteTable1a

  MyPrivateSubnet1bRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref MyPrivateSubnet1b
      RouteTableId: !Ref MyRouteTable1b

  MyJumpBoxSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupName: MyJumpBoxSecurityGroup
      GroupDescription: Security Group for My Jump Box
      VpcId: !Ref MyVPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0
      SecurityGroupEgress:
        - IpProtocol: '-1'
          CidrIp: 0.0.0.0/0
      Tags:
        - Key: Name
          Value: MyJumpBoxSecurityGroup

  MyPublicSubnet1a:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref MyVPC
      CidrBlock: "10.0.3.0/24"
      MapPublicIpOnLaunch: true
      AvailabilityZone: !Select [ 0, !GetAZs '' ]
      Tags:
        - Key: Name
          Value: MyPublicSubnet1a

  MyPublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref MyVPC
      Tags:
        - Key: Name
          Value: MyPublicRouteTable

  MyDefaultPublicRoute:
    Type: AWS::EC2::Route
    DependsOn: MyVPCGatewayAttachment
    Properties:
      RouteTableId: !Ref MyPublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref MyInternetGateway

  MyPublicSubnet1aRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref MyPublicSubnet1a
      RouteTableId: !Ref MyPublicRouteTable

  MyJumpboxKeyPair:
    Type: AWS::EC2::KeyPair
    Properties:
      KeyName: MyJumpBoxKeyPair
      KeyFormat: pem
      KeyType: rsa
      Tags:
        - Key: Name
          Value: MyJumpBoxKeyPair

  MyJumpBoxInstance:
    Type: AWS::EC2::Instance
    DependsOn: MyJumpboxKeyPair
    Properties:
      ImageId: ami-0fb2f0b847d44d4f0
      InstanceType: t2.micro
      KeyName: !Ref MyJumpboxKeyPair
      SubnetId: !Ref MyPublicSubnet1a
      SecurityGroupIds:
        - !Ref MyJumpBoxSecurityGroup
      Tags:
        - Key: Name
          Value: MyJumpBoxInstance

  MyDBSecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub '${StackStage}/MyDBSecret'
      GenerateSecretString:
        SecretStringTemplate: '{"username": "admin"}'
        GenerateStringKey: "password"
        PasswordLength: 32
        ExcludeCharacters: '"@/\\:;+%=.,?<>{}[]()~`#$^&*|'

  MyDBCluster:
    DependsOn: MyDBSecret                      
    Type: AWS::RDS::DBCluster
    Properties:
      Engine: aurora-postgresql
      Port: 5432
      ServerlessV2ScalingConfiguration:
        MinCapacity: 1
        MaxCapacity: 4
      MasterUsername: !Sub '{{resolve:secretsmanager:${MyDBSecret}::username}}'
      MasterUserPassword: !Sub '{{resolve:secretsmanager:${MyDBSecret}::password}}'
      DBClusterIdentifier: My-db-cluster
      DatabaseName: database
      BackupRetentionPeriod: 7
      DBSubnetGroupName: !Ref MyDBSubnetGroup
      VpcSecurityGroupIds:
        - !GetAtt MyDBSecurityGroup.GroupId
      StorageEncrypted: true
      KmsKeyId: "alias/aws/rds"
      Tags:
        - Key: Name
          Value: MyDBCluster

  MyDBInstance1:
    Type: AWS::RDS::DBInstance
    Properties:
      Engine: aurora-postgresql
      AvailabilityZone: !Select [ 0, !GetAZs '' ]
      DBInstanceClass: db.serverless
      DBClusterIdentifier: !Ref MyDBCluster
      DBInstanceIdentifier: My-instance-1
      Tags:
        - Key: Name
          Value: MyDBInstance1

  MyDBSubnetGroup:
    Type: AWS::RDS::DBSubnetGroup
    Properties:
      DBSubnetGroupName: MyDBSubnetGroup
      DBSubnetGroupDescription: Subnet group for MyDBCluster
      SubnetIds:
        - !Ref MyPrivateSubnet1a
        - !Ref MyPrivateSubnet1b
      Tags:
        - Key: Name
          Value: MyDBSubnetGroup

 MyDMSVpcEndpoint:
    Type: 'AWS::EC2::VPCEndpoint'
    Properties:
      VpcId: !Ref MyVPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.dms'
      VpcEndpointType: 'Interface'
      PrivateDnsEnabled: true
      SubnetIds: 
        - !Ref MyPrivateSubnet1a
        - !Ref MyPrivateSubnet1b
      SecurityGroupIds: 
        - !GetAtt MyDBAccessSecurityGroup.GroupId

  MyKinesisVPCEndpoint:
    Type: 'AWS::EC2::VPCEndpoint'
    Properties:
      VpcId: !Ref MyVPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.kinesis-streams'
      VpcEndpointType: 'Interface'
      PrivateDnsEnabled: true
      SubnetIds: 
        - !Ref MyPrivateSubnet1a
        - !Ref MyPrivateSubnet1b
      SecurityGroupIds: 
        - !GetAtt MyDBAccessSecurityGroup.GroupId

  MyReplicationSubnetGroup:
    Type: "AWS::DMS::ReplicationSubnetGroup"
    Properties: 
      ReplicationSubnetGroupIdentifier: MyReplicationSubnetGroup
      ReplicationSubnetGroupDescription: 'Subnet group for DMS Replication'
      SubnetIds:
        - !Ref MyPrivateSubnet1a
        - !Ref MyPrivateSubnet1b

  MyReplicationInstance:
    Type: 'AWS::DMS::ReplicationInstance'
    Properties: 
      ReplicationInstanceIdentifier: 'MyReplicationInstance'
      ReplicationInstanceClass: 'dms.t2.micro'
      AllocatedStorage: 5
      AvailabilityZone: !Select [ 0, !GetAZs '' ]
      VpcSecurityGroupIds: 
        - !GetAtt MyDBAccessSecurityGroup.GroupId
      ReplicationSubnetGroupIdentifier: !Ref MyReplicationSubnetGroup
      MultiAZ: false
      EngineVersion: '3.5.1'
      PubliclyAccessible: false
      AutoMinorVersionUpgrade: true

  MyDMSSourceEndpoint:
    Type: 'AWS::DMS::Endpoint'
    DependsOn: MyDBSecret
    Properties: 
      EndpointIdentifier: 'MyDMSSourceEndpoint'
      EndpointType: 'source'
      EngineName: aurora-postgresql
      DatabaseName: database
      Username: !Sub '{{resolve:secretsmanager:${MyDBSecret}::username}}'
      Password: !Sub '{{resolve:secretsmanager:${MyDBSecret}::password}}'
      ServerName: !GetAtt MyDBCluster.Endpoint.Address
      Port: 5432

  MyKinesisStream:
    Type: 'AWS::Kinesis::Stream'
    Properties: 
      Name: 'MyKinesisStream'
      ShardCount: 1

  MyDMSTargetEndpoint:
    Type: 'AWS::DMS::Endpoint'
    DependsOn: MyDMSVpcEndpoint
    Properties:
      EndpointIdentifier: 'MyDMSTargetEndpoint'
      EndpointType: 'target'
      EngineName: 'kinesis'
      KinesisSettings:
        MessageFormat: 'json'
        StreamArn: !GetAtt MyKinesisStream.Arn
        ServiceAccessRoleArn: !GetAtt MyDMSTargetAccessRole.Arn

  MyDMSTargetAccessRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: 'MyDMSTargetAccessRole'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - 'dms.amazonaws.com'
          Action: 
          - 'sts:AssumeRole'
      Policies:
        - PolicyName: 'DMSAccessPolicy'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:

            - Effect: Allow
              Action: 
              - 'kinesis:*'
              Resource: '*'

            - Effect: Allow
              Action: iam:PassRole
              Resource: "*"

      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole
        - arn:aws:iam::aws:policy/service-role/AmazonDMSCloudWatchLogsRole

Thank you for your time, and any help provided would be greatly appreciated.

1 Answer
0

My two cents.

The error indicates that there is a connectivity issue between the application connecting to Kinesis Data Streams and the Kinesis service endpoint. A few things you can check: -Verify that the application can connect to the internet and the Kinesis service endpoint specifically. -If the application is running in an AWS VPC, check that the VPC configuration like route tables, security groups and network ACLs allow outbound access to the Kinesis endpoint.

As an alternative to public internet access, you can use Kinesis VPC endpoint to communicate privately within your VPC. -Increase the request timeout period configuration in the application connecting to Kinesis. For example: producerConfig.put("RequestTimeout", "5000"); -Check for any temporary network issues that could be causing timeouts. -Monitor application logs and Kinesis metrics to identify if there are too many pending requests overwhelming the Kinesis Producer Library daemon.

profile picture
EXPERT
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions