¿Cómo utilizo Kafka-Kinesis-Connector para conectarme a mi clúster de Amazon MSK?

4 minutos de lectura
0

Cuando intento usar el Kafka-Kinesis-Connector para conectarme a Amazon Managed Streaming para Apache Kafka (Amazon MSK), recibo un mensaje de error.

Descripción breve

Requisitos previos:

  • Tiene una suscripción a AWS activa.
  • Tiene una nube virtual privada (VPC) que es visible tanto para la máquina cliente como para el clúster de MSK. El clúster de MSK y el cliente deben residir en la misma VPC.
  • Dispone de conectividad con los servidores Amazon MSK y Apache Zookeeper.
  • Hay dos subredes asociadas a su VPC.
  • Ha creado temas en MSK para enviar y recibir mensajes desde el servidor.

Resolución

Creación del archivo de proyecto

  1. Para descargar el Kafka-Kinesis-Connector, clone el proyecto kafka-kinesis-connector del sitio web de GitHub.
  2. Para crear el archivo amazon-kinesis-kafka-connector-X.X.X.jar en el directorio de destino, ejecute el comando mvn package:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    El Kafka-Kinesis-Connector busca las credenciales en el siguiente orden: variables de entorno, propiedades del sistema java y el archivo de perfil de credenciales.
  3. Para actualizar la configuración del ajuste DefaultAWSCredentailsProviderChain, ejecute el siguiente comando:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    Este comando previo garantiza que la clave de acceso adjunta al usuario de AWS Identity and Access Management (IAM) tenga los permisos mínimos requeridos. El comando aws configure también garantiza que haya una política disponible para acceder a Amazon Kinesis Data Streams o a Amazon Kinesis Data Firehose. Para obtener más información sobre la configuración de las credenciales de AWS, consulte Provide temporary credentials to the AWS SDK for Java.
    Nota: Si utiliza un kit de desarrollo de Java (JDK), también puede usar la clase EnvironmentVariableCredentialsProvider para proporcionar credenciales.
  4. Si usa Kinesis Data Streams, actualice la política para que se parezca al siguiente ejemplo:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Si utiliza Kinesis Data Firehose, actualice la política para que se parezca al siguiente ejemplo:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Para obtener más información sobre la configuración de la secuencia de entrega de Kinesis Data Firehose, consulte Configuration and credential file settings.

Configuración del conector

Nota: Puede configurar el Kafka-Kinesis-Connector para publicar mensajes desde MSK. Puede publicar mensajes en los siguientes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o Amazon OpenSearch Service.

  1. Si está configurando Kinesis Data Streams, configure el conector con los siguientes valores:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    Si está configurando otro tipo de secuencia, configure las propiedades de la secuencia de entrega de Kinesis Data Firehose con los siguientes valores:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Configure las propiedades del trabajador para el modo independiente o distribuido:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Para obtener más información sobre el modo independiente o el modo distribuido de Kafka-Kinesis-Connector, consulte Kafka Connect en el sitio web de Apache.

  3. Copie el archivo amazon-kinesis-kafka-connector-0.0.X.jar a su directorio y exporte classpath.
    Nota: También puede añadir el archivo amazon-kinesis-kafka-connector-0.0.X.jar al directorio JAVA_HOME/lib/ext.

  4. Para ejecutar kafka-kinesis-connector, utilice la siguiente sintaxis de comandos:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Información relacionada

Creación de un clúster de Amazon MSK

OFICIAL DE AWS
OFICIAL DE AWSActualizada hace un año