Comment utiliser le connecteur Kafka-Kinesis-Connector pour me connecter à mon cluster Amazon MSK ?

Lecture de 4 minute(s)
0

Lorsque j'essaie d'utiliser le connecteur Kafka-Kinesis-Connector pour me connecter à Amazon Managed Streaming pour Apache Kafka (Amazon MSK), je reçois un message d'erreur.

Brève description

Prérequis :

  • Vous disposez d’un abonnement AWS actif.
  • Vous disposez d'un cloud privé virtuel (VPC) visible à la fois par la machine cliente et par le cluster MSK. Le cluster MSK et le client doivent résider dans le même VPC.
  • Vous disposez d’une connexion aux serveurs Amazon MSK et Apache Zookeeper.
  • Deux sous-réseaux sont associés à votre VPC.
  • Vous avez créé des rubriques dans MSK pour envoyer et recevoir des messages depuis le serveur.

Résolution

Création de votre dossier de projet

  1. Pour télécharger le connecteur Kafka-Kinesis-Connector, clonez le projet kafka-kinesis-connector depuis le site Web de GitHub.
  2. Pour créer le fichier amazon-kinesis-kafka-connector-X.X.X.jar dans le répertoire cible, exécutez la commande mvn package :
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Le connecteur Kafka-Kinesis-Connector recherche les informations d'identification dans l'ordre suivant : variables d'environnement, propriétés du système Java et fichier de profil des informations d'identification.
  3. Pour mettre à jour votre configuration avec le paramètre DefaultAWSCredentailsProviderChain, exécutez la commande suivante :
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    La commande ci-dessus garantit que la clé d'accès attachée à l'utilisateur Gestion des identités et des accès AWS (AWS IAM) dispose des autorisations minimales requises. La commande aws configure permet également de vérifier qu'une politique est disponible pour accéder à Amazon Kinesis Data Streams ou à Amazon Kinesis Data Firehose. Pour en savoir plus sur le paramétrage des informations d'identification AWS, consultez la page Fournir des informations d'identification temporaires au kit AWS SDK pour Java.
    Remarque : si vous utilisez un kit de développement Java (JDK), vous pouvez également utiliser la classe EnvironmentVariableCredentialsProvider pour fournir des informations d'identification.
  4. Si vous utilisez Kinesis Data Streams, vous devez mettre à jour votre politique pour qu'elle ressemble à l'exemple suivant :
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Si vous utilisez Kinesis Data Firehose, vous devez mettre à jour votre politique pour qu'elle ressemble à l'exemple suivant :
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Pour en savoir plus sur les paramètres du flux de diffusion de Kinesis Data Firehose, consultez la page Paramètres des fichiers de configuration et d'informations d'identification.

Configuration du connecteur

Remarque : vous pouvez configurer le connecteur Kafka-Kinesis-Connector pour publier des messages depuis MSK. Vous pouvez publier des messages vers les destinations suivantes : Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon OpenSearch Service.

  1. Si vous paramétrez Kinesis Data Streams, configurez le connecteur avec les valeurs suivantes :

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    Si vous paramétrez un autre type de flux, configurez les propriétés du flux de diffusion de Kinesis Data Firehose avec les valeurs suivantes :

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Configurez « worker properties » en mode standalone or distributed :

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Pour en savoir plus sur le mode standalone ou distributed de Kafka-Kinesis-Connector, consultez la section Kafka Connect sur le site Web d'Apache.

  3. Copiez le fichier amazon-kinesis-kafka-connector-0.0.X.jar dans votre répertoire et exportez classpath.
    Remarque : vous pouvez également ajouter le fichier amazon-kinesis-kafka-connector-0.0.X.jar dans le répertoire JAVA_HOME/lib/ext.

  4. Pour exécuter le connecteur Kafka-Kinesis-Connector, utilisez la syntaxe de commande suivante :

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Informations connexes

Création d'un cluster Amazon MSK

AWS OFFICIEL
AWS OFFICIELA mis à jour il y a 10 mois