- Newest
- Most votes
- Most comments
check connectivity between VPCs and the security groups configuration:
- set up an ec2 instance in the target VPC, assign it security group of the target cluster, try to telnet to port 9092 of any of the source brokers
- if you get connection timeout: a. ensure you have outbound rules in target cluster to connect to port 9092 on SG of the source cluster b. ensure you have inbound rules in source cluster SG - port 9092 for target cluster SG
- if you can connect from ec2 instance and still getting connection time out, check if SG used for target MSK ENIs is the same as used for Connector ENI in target VPC
- if they are the same, check VPC peering or TGW, or other network related configurations between VPCs.
Let us know the results, please
I have made some progress on Kafka topics replication. Noticed that running MirrorMaker2 in an EC2 instance with below configuration worked to replicate topics from source to the target MSK cluster but the same fails when tried with MSK Connect. Note that source cluster has no authentication configured whereas the target cluster is configured with IAM authentication/authorization.
On a side note I could also remove the cluster name prefix for the replicated topics by adding replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
as mentioned at mirrormaker2-msk-migration when running MirrorMaker2 in the EC2 instance.
Another observation was that MSK Connect appears to expect the source cluster alias as 'source' since giving another string as 'my-source' didn't seem to work as the connector failed to start with an error for this field.
Error:
ERROR [MirrorSourceConnectorCustom|worker] Scheduler for MirrorSourceConnector caught exception in scheduled task: loading initial set of topic-partitions (org.apache.kafka.connect.mirror.Scheduler:102)
[Worker-0ba8c4d265a06ee31] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1683240049825, tries=1, nextAllowedTryMs=1683240049926) timed out at 1683240049826 after 1 attempt(s)
Configuration:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector **-- this line is not used in EC2 MirrorMaker2 test since there we're using Kafka provided connect-mirror-maker.sh by pointing to this configuration**
replication.factor=3
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::111111111:role/msk-full-access-role" awsDebugCreds=true awsStsRegion="ca-central-1";
offset-sync.topic.replication.factor=3
sync.topic.acls.enabled=false
tasks.max=2
source->target.emit.checkpoints.enabled=true
source->target.enabled=true
source.cluster.alias=source
target.cluster.security.protocol=SASL_SSL
clusters=source,target
topics.exclude=.*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset
source->target.emit.heartbeats.enabled=true
target.cluster.sasl.mechanism=AWS_MSK_IAM
topics=my-topic-1,my-topic-2,my-topic-4,my-topic-3
refresh.topics.enabled=true
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
groups=.*
source.cluster.bootstrap.servers=b-1.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092,b-2.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092,b-3.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092
target.cluster.alias=target
target.cluster.bootstrap.servers=b-3.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098,b-1.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098,b-2.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098
heartbeat.topic.replication.factor=3
sync.topic.configs.enabled=true
checkpoint.topic.replication.factor=3
source.cluster.security.protocol=PLAINTEXT
offset.flush.timeout.ms=50000
buffer.memory=100
you can use different names in MSK Connect for MM2 cluster names. Please post here the configuration (and the error trace) that was failing, it could be related to the configuration.
Another recommendation: if you are running on EC2 (meaning, you are free to choose a kafka version under MM2), you don't need to use
CustomMM2ReplicationPolicy
. Starting Kafka 3, MM2 hasIdentityReplicationPolicy
(https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java)Thank you. I tried with IdentityReplicationPolicy and noticed that target topics still has the source cluster alias as the prefix. I will check and try again to see if it's something related to the configuration. Coming to MSK Connect, I have used exact configuration posted above except for the account and actual endpoint URLs.
@rePost-User-7286222, please post configuration you used with source and target names different than
source
andtarget
. So it will be easy to spot possible misconfig.
I also was not able to see a custom replication policy via the custom.replication.policy property work in MSK connect. However I was able to get around this by setting the following items to be empty in each of the three required connectors.
replication.policy.separator=
source.cluster.alias=
target.cluster.alias=
When I did this, the topics in the target have the same name as the source, and offset translation works as expected.
Got this idea from this SO post: https://stackoverflow.com/questions/59390555/is-it-possible-to-replicate-kafka-topics-without-alias-prefix-with-mirrormaker2
Relevant content
- asked 8 months ago
- asked a month ago
- Accepted Answerasked 2 months ago
- asked 2 years ago
- AWS OFFICIALUpdated 9 months ago
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 2 years ago
Thanks for the update. I believe you're the same Ed who wrote the blog post, thank you very much for that as I couldn't much information about configuring MSK connect to work with MirrorMaker2.
Coming to the issue I noticed that I had an incorrect IAM policy attached to the role, now seeing below error after fixing the policy as below.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:", "kafka:", ], "Resource": [ "*" ] } ] }
Error message when connecting to target cluster endpoint: [MirrorSourceConnectorWithFullAccess|worker] [AdminClient clientId=adminclient-10] Failed authentication with b-2.fgfhfghfhh.tryrtyrty.c3.kafka.ca-central-1.amazonaws.com/INTERNAL_IP (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/52.119.198.216] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.)
Note that I have provided MSK IAM private endpoints in the target.cluster.bootstrap.servers list, or should this be public endpoint list since the target cluster is configured with public access?
Forgot to mention that we do have VPC peering in place between the VPCs of the source and target clusters, so that's not an issue. I read about setting up a private STS interface endpoint for the target cluster VPC, I did that and still seeing the same error as it's using the global sts endpoint. I am not sure how and where to update the regional STS endpoint as I don't recall adding the endpoint information explicitly anywhere.
I don't think it's working as I am still seeing this error even after fixing IAM role trust relationship policy which was an issue earlier.
[Worker-031d6f985ad5997a5] org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/209.54.180.124] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. [Worker-031d6f985ad5997a5] Caused by: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com] failed: connect timed out] [Worker-031d6f985ad5997a5] at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:149) [Worker-031d6f985ad5997a5] at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:96) [Worker-031d6f985ad5997a5] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:524) [Worker-031d6f985ad5997a5] at java.base/java.security.AccessController.doPrivileged(Native Method)