Kafka-Kinesis-Connector를 사용하여 Amazon MSK 클러스터에 연결하려면 어떻게 해야 하나요?

3분 분량
0

Kafka-Kinesis-Connector를 사용하여 Amazon Managed Streaming for Apache Kafka(Amazon MSK)에 연결하려고 하면 오류 메시지가 표시됩니다.

간략한 설명


사전 요구 사항:

  • 활성 AWS 구독이 있습니다.
  • 클라이언트 시스템과 MSK 클러스터 모두에서 볼 수 있는 Virtual Private Cloud(VPC)가 있습니다. MSK 클러스터와 클라이언트는 동일한 VPC에 있어야 합니다.
  • Amazon MSK 및 Apache Zookeeper 서버에 연결되어 있습니다.
  • VPC와 연결된 서브넷이 두 개 있습니다.
  • 서버에서 메시지를 보내고 받기 위해 MSK에서 주제를 만들었습니다.

해결 방법

프로젝트 파일 빌드

  1. Kafka-Kinesis-Connector를 다운로드하려면 GitHub 웹 사이트에서 kafka-kinesis-connector 프로젝트를 복제합니다.
  2. 대상 디렉터리에서 amazon-kinesis-kafka-connector-X.X.X.jar 파일을 빌드하려면 mvn package 명령을 실행합니다.
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Kafka-Kinesis-Connector는 환경 변수, Java 시스템 속성 및 자격 증명 프로필 파일의 순서로 자격 증명을 찾습니다.
  3. 구성을 DefaultAWSCredentailsProviderChain 설정으로 업데이트하려면 다음 명령을 실행합니다.
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    앞의 명령은 AWS Identity and Access Management(IAM) 사용자에게 연결된 액세스 키에 필요한 최소 권한이 있는지 확인합니다. 또한 aws configure 명령은 Amazon Kinesis Data Streams 또는 Amazon Kinesis Data Firehose에 액세스하는 데 사용할 수 있는 정책이 있는지 확인합니다. AWS 자격 증명 설정에 대한 자세한 내용은 AWS SDK for Java에 임시 자격 증명 제공을 참조하세요.
    참고: JDK(Java Development Kit)를 사용하는 경우 EnvironmentVariableCredentialsProvider 클래스를 사용하여 자격 증명을 제공할 수도 있습니다.
  4. Kinesis Data Streams를 사용하는 경우 다음 예제와 유사하게 정책을 업데이트합니다.
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Kinesis Data Firehose를 사용하는 경우 다음 예제와 유사하게 정책을 업데이트합니다.
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Kinesis Data Firehose 전송 스트림 설정에 대한 자세한 내용은 구성 및 자격 증명 파일 설정을 참조하세요.

커넥터 구성

참고: MSK에서 메시지를 게시하도록 Kafka-Kinesis-Connector를 구성할 수 있습니다. 다음 대상에 메시지를 게시할 수 있습니다. Amazon Simple Storage Service(Amazon S3), Amazon Redshift 또는 Amazon OpenSearch Service.

  1. Kinesis Data Streams를 설정하는 경우 다음 값으로 커넥터를 구성합니다.

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    다른 유형의 스트림을 설정하는 경우 다음 값을 사용하여 Kinesis Data Firehose 전송 스트림 속성을 구성합니다.

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. 독립형 또는 분산 모드의 작업자 속성을 구성합니다.

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Kafka-Kinesis-Connector의 독립형 또는 분산 모드에 대한 자세한 내용은 Apache 웹 사이트의 Kafka Connect를 참조하세요.

  3. amazon-kinesis-kafka-connector-0.0.X.jar 파일을 디렉토리에 복사하고 classpath를 내 보냅니다.
    참고: amazon-kinesis-kafka-connector-0.0.X.jar 파일을 JAVA_HOME/lib/ext 디렉토리에 추가할 수도 있습니다.

  4. Kafka-Kinesis-Connector를 실행하려면 다음 명령 구문을 사용합니다.

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

관련 정보

Amazon MSK 클러스터 생성

AWS 공식
AWS 공식업데이트됨 9달 전