Salta al contenuto

Come posso connettermi al cluster Amazon MSK utilizzando il connettore Kafka-Kinesis?

4 minuti di lettura
0

Quando provo a utilizzare il connettore Kafka-Kinesis per connettermi a Streaming gestito da Amazon per Apache Kafka (Amazon MSK) ricevo un messaggio di errore.

Breve descrizione

Prerequisiti:

  • Hai un abbonamento AWS attivo.
  • Hai un cloud privato virtuale (VPC) visibile sia alla macchina client che al cluster MSK. Il cluster MSK e il client devono risiedere nello stesso VPC.
  • Hai la connettività ai server Amazon MSK e Apache Zookeeper.
  • Ci sono due sottoreti associate al tuo VPC.
  • Hai creato argomenti in MSK per inviare e ricevere messaggi dal server.

Risoluzione

Creazione del file di progetto

  1. Per scaricare Kafka-Kinesis-Connector, clona il progetto kafka-kinesis-connector dal sito Web di GitHub.
  2. Usa il comando mvn package per creare il file amazon-kinesis-kafka-connector-X.X.X.jar nella directory di destinazione:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Il connettore Kafka-Kinesis cerca le credenziali nel seguente ordine: variabili di ambiente, proprietà del sistema java e file del profilo delle credenziali.
  3. Per aggiornare la configurazione all'impostazione DefaultAWSCredentailsProviderChain, esegui il comando seguente:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    Questo comando assicura che la chiave di accesso associata all'utente AWS Identity and Access Management (IAM) disponga delle autorizzazioni minime richieste. Il comandoaws configure assicura inoltre che sia disponibile una policy per accedere al flusso di dati Amazon Kinesis o ad Amazon Kinesis Data Firehose. Per ulteriori informazioni sull'impostazione delle credenziali AWS, consulta Provide temporary credentials to the AWS SDK for Java.
    Nota: se utilizzi un Java Development Kit (JDK), puoi utilizzare anche la classe EnvironmentVariableCredentialsProvider per fornire le credenziali.
  4. Se utilizzi Kinesis Data Streams, aggiorna la tua policy in modo che sia simile all'esempio seguente:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Se utilizzi Kinesis Data Firehose, aggiorna la tua policy in modo che sia simile all'esempio seguente:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Per ulteriori informazioni sulle impostazioni del flusso di consegna di Kinesis Data Firehose, consulta Configuration and credential file settings.

Configurazione del connettore

Nota: puoi configurare il connettore Kafka-Kinesis per pubblicare messaggi da MSK. I messaggi possono essere pubblicati nelle seguenti destinazioni: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o servizio OpenSearch di Amazon.

  1. Se stai impostando il flusso di dati Kinesis, puoi configurare il connettore con i seguenti valori:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    Se stai configurando un tipo diverso di flusso, configura le proprietà del flusso di distribuzione Kinesis Data Firehose con i seguenti valori:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Configura le proprietà worker, operatore, per la modalità autonoma o distribuita:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Per ulteriori informazioni sulla modalità autonoma o distribuita del connettore Kafka-Kinesis, consulta Kafka Connect sul sito web di Apache.

  3. Copia il file amazon-kinesis-kafka-connector-0.0.X.jar nella tua directory ed esporta il classpath.
    Nota: puoi aggiungere anche il file amazon-kinesis-kafka-connector-0.0.X.jar alla directory JAVA_HOME/lib/ext.

  4. Esegui il connettore Kafka-Kinesis utilizzando la seguente sintassi di comando:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Informazioni correlate

Creazione di un cluster Amazon MSK

AWS UFFICIALEAggiornata 2 anni fa