Replicate topics and messages between 2 MSK clusters using MirrorMaker2



I have followed this blog post to set up MSK Connect Connectors, plugins and other configuration to replicate Kafka topics and messages between 2 MSK clusters set up in the same account but in 2 different VPCs. The Source cluster from which data needs to be replicated is configured with Plaintext, whereas the destination cluster is configured with IAM authentication. I have created IAM role and policy as recommended in the referenced post. Seeing this error in the MirrorSourceConnector and MirrorCheckpointConnector logs.

org.apache.kafka.connect.errors.ConnectException: Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create

MirrorSourceConnector configuration properties:

# private Plaintext bootstrap servers endpoints for source MSK cluster,,
# private IAM bootstrap servers endpoints for target MSK cluster,, required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;
target.cluster.producer.sasl.mechanism=AWS_MSK_IAM required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;
target.cluster.consumer.sasl.mechanism=AWS_MSK_IAM required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;

I am sure I am missing something here, any help is really appreciated. Also is it not possible to edit the connector configuration after it's created?


已提问 1 年前2999 查看次数
3 回答

check connectivity between VPCs and the security groups configuration:

  1. set up an ec2 instance in the target VPC, assign it security group of the target cluster, try to telnet to port 9092 of any of the source brokers
  2. if you get connection timeout: a. ensure you have outbound rules in target cluster to connect to port 9092 on SG of the source cluster b. ensure you have inbound rules in source cluster SG - port 9092 for target cluster SG
  3. if you can connect from ec2 instance and still getting connection time out, check if SG used for target MSK ENIs is the same as used for Connector ENI in target VPC
  4. if they are the same, check VPC peering or TGW, or other network related configurations between VPCs.

Let us know the results, please

profile pictureAWS
已回答 1 年前
  • Thanks for the update. I believe you're the same Ed who wrote the blog post, thank you very much for that as I couldn't much information about configuring MSK connect to work with MirrorMaker2.

    Coming to the issue I noticed that I had an incorrect IAM policy attached to the role, now seeing below error after fixing the policy as below.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:", "kafka:", ], "Resource": [ "*" ] } ] }

    Error message when connecting to target cluster endpoint: [MirrorSourceConnectorWithFullAccess|worker] [AdminClient clientId=adminclient-10] Failed authentication with (An error: ( Failed to find AWS IAM Credentials [Caused by Unable to execute HTTP request: Connect to [] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.)

    Note that I have provided MSK IAM private endpoints in the target.cluster.bootstrap.servers list, or should this be public endpoint list since the target cluster is configured with public access?

  • Forgot to mention that we do have VPC peering in place between the VPCs of the source and target clusters, so that's not an issue. I read about setting up a private STS interface endpoint for the target cluster VPC, I did that and still seeing the same error as it's using the global sts endpoint. I am not sure how and where to update the regional STS endpoint as I don't recall adding the endpoint information explicitly anywhere.

  • I don't think it's working as I am still seeing this error even after fixing IAM role trust relationship policy which was an issue earlier.

    [Worker-031d6f985ad5997a5] org.apache.kafka.common.errors.SaslAuthenticationException: An error: ( Failed to find AWS IAM Credentials [Caused by Unable to execute HTTP request: Connect to [] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. [Worker-031d6f985ad5997a5] Caused by: Failed to find AWS IAM Credentials [Caused by Unable to execute HTTP request: Connect to [] failed: connect timed out] [Worker-031d6f985ad5997a5] at [Worker-031d6f985ad5997a5] at [Worker-031d6f985ad5997a5] at$createSaslToken$1( [Worker-031d6f985ad5997a5] at java.base/ Method)


I have made some progress on Kafka topics replication. Noticed that running MirrorMaker2 in an EC2 instance with below configuration worked to replicate topics from source to the target MSK cluster but the same fails when tried with MSK Connect. Note that source cluster has no authentication configured whereas the target cluster is configured with IAM authentication/authorization.

On a side note I could also remove the cluster name prefix for the replicated topics by adding replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy as mentioned at mirrormaker2-msk-migration when running MirrorMaker2 in the EC2 instance.

Another observation was that MSK Connect appears to expect the source cluster alias as 'source' since giving another string as 'my-source' didn't seem to work as the connector failed to start with an error for this field.


ERROR [MirrorSourceConnectorCustom|worker] Scheduler for MirrorSourceConnector caught exception in scheduled task: loading initial set of topic-partitions (org.apache.kafka.connect.mirror.Scheduler:102)
[Worker-0ba8c4d265a06ee31] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1683240049825, tries=1, nextAllowedTryMs=1683240049926) timed out at 1683240049826 after 1 attempt(s)


connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector **-- this line is not used in EC2 MirrorMaker2 test since there we're using Kafka provided by pointing to this configuration**
replication.factor=3 required awsRoleArn="arn:aws:iam::111111111:role/msk-full-access-role" awsDebugCreds=true awsStsRegion="ca-central-1";
已回答 1 年前
  • you can use different names in MSK Connect for MM2 cluster names. Please post here the configuration (and the error trace) that was failing, it could be related to the configuration.

    Another recommendation: if you are running on EC2 (meaning, you are free to choose a kafka version under MM2), you don't need to use CustomMM2ReplicationPolicy. Starting Kafka 3, MM2 has IdentityReplicationPolicy (

  • Thank you. I tried with IdentityReplicationPolicy and noticed that target topics still has the source cluster alias as the prefix. I will check and try again to see if it's something related to the configuration. Coming to MSK Connect, I have used exact configuration posted above except for the account and actual endpoint URLs.

  • @rePost-User-7286222, please post configuration you used with source and target names different than source and target. So it will be easy to spot possible misconfig.


I also was not able to see a custom replication policy via the custom.replication.policy property work in MSK connect. However I was able to get around this by setting the following items to be empty in each of the three required connectors.


When I did this, the topics in the target have the same name as the source, and offset translation works as expected.

Got this idea from this SO post:

已回答 1 年前

您未登录。 登录 发布回答。