Wie verwende ich den Kafka-Kinesis-Connector, um eine Verbindung zu meinen Amazon-MSK-Cluster herzustellen?

Lesedauer: 4 Minute
0

Wenn ich versuche, den Kafka-Kinesis-Connector zu verwenden, um eine Verbindung mit Amazon Managed Streaming für Apache Kafka (Amazon MSK) herzustellen, erhalte ich eine Fehlermeldung.

Kurzbeschreibung

Voraussetzungen:

  • Sie verfügen über ein aktives AWS-Abonnement.
  • Sie verfügen über eine virtuelle private Cloud (VPC), die sowohl für den Client-Computer als auch für den MSK-Cluster sichtbar ist. Der MSK-Cluster und der Client müssen sich in derselben VPC befinden.
  • Sie haben Konnektivität zu Amazon-MSK- und Apache Zookeeper-Servern.
  • Ihrer VPC sind zwei Subnetze zugeordnet.
  • Sie haben Themen in MSK zum Senden und Empfangen von Nachrichten vom Server erstellt.

Behebung

Ihre Projektdatei erstellen

  1. Um den Kafka-Kinesis-Connector herunterzuladen, klonen Sie das Projekt kafka-kinesis-connector von der GitHub-Website.
  2. Um die Datei amazon-kinesis-kafka-connector-X.X.X.jar im Zielverzeichnis zu erstellen, verwenden Sie den Befehl mvn package:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Der Kafka-Kinesis-Connector sucht in der folgenden Reihenfolge nach Anmeldeinformationen: Umgebungsvariablen, Java-Systemeigenschaften und die Anmeldeinformationsprofildatei.
  3. Führen Sie den folgenden Befehl aus, um Ihre Konfiguration auf die Einstellung DefaultAWSCredentailsProviderChain zu aktualisieren:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    Die genannten Befehle stellen sicher, dass der dem AWS Identity and Access Management (IAM)-Benutzer zugeordnete Zugriffsschlüssel über die erforderlichen Mindestberechtigungen verfügt. Der Befehl aws configure stellt außerdem sicher, dass eine Richtlinie für den Zugriff auf Amazon Kinesis Data Streams oder Amazon Kinesis Data Firehose verfügbar ist. Weitere Informationen zum Festlegen von AWS-Anmeldeinformationen finden Sie unter Bereitstellen temporärer Anmeldeinformationen für das AWS SDK für Java.
    Hinweis: Wenn Sie ein Java Development Kit (JDK) verwenden, können Sie auch die Klasse EnvironmentVariableCredentialsProvider verwenden, um Anmeldeinformationen bereitzustellen.
  4. Wenn Sie Kinesis Data Streams verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie dem folgenden Beispiel ähnelt:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Wenn Sie Kinesis Data Firehose verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie dem folgenden Beispiel ähnelt:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Weitere Informationen zu den Lieferstream-Einstellungen von Kinesis Data Firehose finden Sie unter Konfiguration und Einstellungen für Anmeldeinformationsdateien.

Den Connector konfigurieren

Hinweis: Sie können den Kafka-Kinesis-Connector so konfigurieren, dass er Nachrichten von MSK veröffentlicht. Sie können Nachrichten an die folgenden Ziele veröffentlichen: Amazon Simple Storage Service (Amazon S3), Amazon Redshift oder Amazon OpenSearch Service.

  1. Wenn Sie Kinesis Data Streams einrichten, können Sie den Connector mit den folgenden Werten konfigurieren:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    Wenn Sie einen anderen Streamtyp einrichten, konfigurieren Sie die Eigenschaften des Kinesis-Data-Firehose-Lieferstreams mit den folgenden Werten:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Konfigurieren Sie die Worker-Eigenschaften entweder für den eigenständigen oder den verteilten Modus:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Weitere Informationen zum eigenständigen oder verteilten Modus von Kafka-Kinesis-Connector finden Sie unter Kafka Connect auf der Apache-Website.

  3. Kopieren Sie die Datei amazon-kinesis-kafka-connector-0.0.x.jar in Ihr Verzeichnis und exportieren Sie classpath.
    Hinweis: Sie können die Datei amazon-kinesis-kafka-connector-0.0.x.jar auch zum Verzeichnis JAVA_HOME/lib/ext hinzufügen.

  4. Um den Kafka-Kinesis-Connector auszuführen, verwenden Sie die folgende Befehlssyntax:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Ähnliche Informationen

Einen Amazon MSK-Cluster erstellen

AWS OFFICIAL
AWS OFFICIALAktualisiert vor einem Jahr