¿Cómo utilizo Kafka-Kinesis-Connector para conectarme a mi clúster de Amazon MSK?
Cuando intento usar el Kafka-Kinesis-Connector para conectarme a Amazon Managed Streaming para Apache Kafka (Amazon MSK), recibo un mensaje de error.
Descripción breve
Requisitos previos:
- Tiene una suscripción a AWS activa.
- Tiene una nube virtual privada (VPC) que es visible tanto para la máquina cliente como para el clúster de MSK. El clúster de MSK y el cliente deben residir en la misma VPC.
- Dispone de conectividad con los servidores Amazon MSK y Apache Zookeeper.
- Hay dos subredes asociadas a su VPC.
- Ha creado temas en MSK para enviar y recibir mensajes desde el servidor.
Resolución
Creación del archivo de proyecto
- Para descargar el Kafka-Kinesis-Connector, clone el proyecto kafka-kinesis-connector del sitio web de GitHub.
- Para crear el archivo amazon-kinesis-kafka-connector-X.X.X.jar en el directorio de destino, ejecute el comando mvn package:
El Kafka-Kinesis-Connector busca las credenciales en el siguiente orden: variables de entorno, propiedades del sistema java y el archivo de perfil de credenciales.[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package.. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
- Para actualizar la configuración del ajuste DefaultAWSCredentailsProviderChain, ejecute el siguiente comando:
Este comando previo garantiza que la clave de acceso adjunta al usuario de AWS Identity and Access Management (IAM) tenga los permisos mínimos requeridos. El comando aws configure también garantiza que haya una política disponible para acceder a Amazon Kinesis Data Streams o a Amazon Kinesis Data Firehose. Para obtener más información sobre la configuración de las credenciales de AWS, consulte Provide temporary credentials to the AWS SDK for Java.[ec2-user@ip-10-0-0-71 target]$ aws configure
Nota: Si utiliza un kit de desarrollo de Java (JDK), también puede usar la clase EnvironmentVariableCredentialsProvider para proporcionar credenciales. - Si usa Kinesis Data Streams, actualice la política para que se parezca al siguiente ejemplo:
Si utiliza Kinesis Data Firehose, actualice la política para que se parezca al siguiente ejemplo:{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Para obtener más información sobre la configuración de la secuencia de entrega de Kinesis Data Firehose, consulte Configuration and credential file settings.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Configuración del conector
Nota: Puede configurar el Kafka-Kinesis-Connector para publicar mensajes desde MSK. Puede publicar mensajes en los siguientes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o Amazon OpenSearch Service.
-
Si está configurando Kinesis Data Streams, configure el conector con los siguientes valores:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
Si está configurando otro tipo de secuencia, configure las propiedades de la secuencia de entrega de Kinesis Data Firehose con los siguientes valores:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
-
Configure las propiedades del trabajador para el modo independiente o distribuido:
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Para obtener más información sobre el modo independiente o el modo distribuido de Kafka-Kinesis-Connector, consulte Kafka Connect en el sitio web de Apache.
-
Copie el archivo amazon-kinesis-kafka-connector-0.0.X.jar a su directorio y exporte classpath.
Nota: También puede añadir el archivo amazon-kinesis-kafka-connector-0.0.X.jar al directorio JAVA_HOME/lib/ext. -
Para ejecutar kafka-kinesis-connector, utilice la siguiente sintaxis de comandos:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Información relacionada
Contenido relevante
- Como solucionar el error: Supplied Policy document is breaching Cloudwatch Logs policy length limit.Respuesta aceptadapreguntada hace 7 díaslg...
- preguntada hace un meslg...
- preguntada hace 21 díaslg...
- preguntada hace un meslg...
- preguntada hace 10 díaslg...
- OFICIAL DE AWSActualizada hace 3 años
- OFICIAL DE AWSActualizada hace 2 años
- OFICIAL DE AWSActualizada hace 2 años
- OFICIAL DE AWSActualizada hace 2 años