Wie verwende ich den Kafka-Kinesis-Connector, um eine Verbindung zu meinen Amazon-MSK-Cluster herzustellen?
Wenn ich versuche, den Kafka-Kinesis-Connector zu verwenden, um eine Verbindung mit Amazon Managed Streaming für Apache Kafka (Amazon MSK) herzustellen, erhalte ich eine Fehlermeldung.
Kurzbeschreibung
Voraussetzungen:
- Sie verfügen über ein aktives AWS-Abonnement.
- Sie verfügen über eine virtuelle private Cloud (VPC), die sowohl für den Client-Computer als auch für den MSK-Cluster sichtbar ist. Der MSK-Cluster und der Client müssen sich in derselben VPC befinden.
- Sie haben Konnektivität zu Amazon-MSK- und Apache Zookeeper-Servern.
- Ihrer VPC sind zwei Subnetze zugeordnet.
- Sie haben Themen in MSK zum Senden und Empfangen von Nachrichten vom Server erstellt.
Behebung
Ihre Projektdatei erstellen
- Um den Kafka-Kinesis-Connector herunterzuladen, klonen Sie das Projekt kafka-kinesis-connector von der GitHub-Website.
- Um die Datei amazon-kinesis-kafka-connector-X.X.X.jar im Zielverzeichnis zu erstellen, verwenden Sie den Befehl mvn package:
Der Kafka-Kinesis-Connector sucht in der folgenden Reihenfolge nach Anmeldeinformationen: Umgebungsvariablen, Java-Systemeigenschaften und die Anmeldeinformationsprofildatei.[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package.. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
- Führen Sie den folgenden Befehl aus, um Ihre Konfiguration auf die Einstellung DefaultAWSCredentailsProviderChain zu aktualisieren:
Die genannten Befehle stellen sicher, dass der dem AWS Identity and Access Management (IAM)-Benutzer zugeordnete Zugriffsschlüssel über die erforderlichen Mindestberechtigungen verfügt. Der Befehl aws configure stellt außerdem sicher, dass eine Richtlinie für den Zugriff auf Amazon Kinesis Data Streams oder Amazon Kinesis Data Firehose verfügbar ist. Weitere Informationen zum Festlegen von AWS-Anmeldeinformationen finden Sie unter Bereitstellen temporärer Anmeldeinformationen für das AWS SDK für Java.[ec2-user@ip-10-0-0-71 target]$ aws configure
Hinweis: Wenn Sie ein Java Development Kit (JDK) verwenden, können Sie auch die Klasse EnvironmentVariableCredentialsProvider verwenden, um Anmeldeinformationen bereitzustellen. - Wenn Sie Kinesis Data Streams verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie dem folgenden Beispiel ähnelt:
Wenn Sie Kinesis Data Firehose verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie dem folgenden Beispiel ähnelt:{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Weitere Informationen zu den Lieferstream-Einstellungen von Kinesis Data Firehose finden Sie unter Konfiguration und Einstellungen für Anmeldeinformationsdateien.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Den Connector konfigurieren
Hinweis: Sie können den Kafka-Kinesis-Connector so konfigurieren, dass er Nachrichten von MSK veröffentlicht. Sie können Nachrichten an die folgenden Ziele veröffentlichen: Amazon Simple Storage Service (Amazon S3), Amazon Redshift oder Amazon OpenSearch Service.
-
Wenn Sie Kinesis Data Streams einrichten, können Sie den Connector mit den folgenden Werten konfigurieren:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
Wenn Sie einen anderen Streamtyp einrichten, konfigurieren Sie die Eigenschaften des Kinesis-Data-Firehose-Lieferstreams mit den folgenden Werten:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
-
Konfigurieren Sie die Worker-Eigenschaften entweder für den eigenständigen oder den verteilten Modus:
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Weitere Informationen zum eigenständigen oder verteilten Modus von Kafka-Kinesis-Connector finden Sie unter Kafka Connect auf der Apache-Website.
-
Kopieren Sie die Datei amazon-kinesis-kafka-connector-0.0.x.jar in Ihr Verzeichnis und exportieren Sie classpath.
Hinweis: Sie können die Datei amazon-kinesis-kafka-connector-0.0.x.jar auch zum Verzeichnis JAVA_HOME/lib/ext hinzufügen. -
Um den Kafka-Kinesis-Connector auszuführen, verwenden Sie die folgende Befehlssyntax:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Ähnliche Informationen
Relevanter Inhalt
- AWS OFFICIALAktualisiert vor einem Jahr
- AWS OFFICIALAktualisiert vor 8 Monaten
- AWS OFFICIALAktualisiert vor einem Jahr
- AWS OFFICIALAktualisiert vor einem Jahr