Kafka-Kinesis-Connector を使用して Amazon MSK クラスターに接続する方法を教えてください。

所要時間2分
0

Kafka-Kinesis-Connector を使用して Amazon Managed Streaming for Apache Kafka (Amazon MSK) に接続しようとすると、エラーメッセージが表示されます。

簡単な説明

前提条件:

  • 有効な AWS サブスクリプションがあること。
  • クライアントマシンと MSK クラスターの両方から見える仮想プライベートクラウド (VPC) があること。MSK クラスターとクライアントは同じ VPC に存在する必要があります。
  • Amazon MSK サーバーと Apache ZooKeeper サーバーに接続できます。
  • VPC には 2 つのサブネットが関連付けられています。
  • サーバーとの間でメッセージを送受信するためのトピックを MSK で作成しました。

解決策

プロジェクトファイルをビルド

  1. Kafka-Kinesis-Connector をダウンロードするには、GitHub Web サイトから kafka-kinesis-connector プロジェクトのクローンを作成します。
  2. ターゲットディレクトリに amazon-kinesis-kafka-connector-X.X.X.jar ファイルをビルドするには、mvn package コマンドを実行します。
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Kafka-Kinesis-Connector は、環境変数、java システムプロパティ、および認証情報プロファイルファイルの順序で認証情報を検索します。
  3. 構成を DefaultAWSCredentailsProviderChain 設定に更新するには、次のコマンドを実行します。
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    上記のコマンドは、AWS Identity and Access Management (IAM) ユーザーにアタッチされたアクセスキーに最低限必要な権限があることを確認します。aws configure コマンドは、Amazon Kinesis Data Streams または Amazon Kinesis Data Firehose にアクセスするための利用可能なポリシーがあることも確認します。AWS 認証情報の設定の詳細については、「AWS SDK for Java への一時的な認証情報の提供」を参照してください。
    **注:**Java 開発キット (JDK) を使用する場合は、EnvironmentVariableCredentialsProviderクラスを使用して認証情報を提供することもできます。
  4. Kinesis Data Streams を使用している場合は、次の例のようにポリシーを更新してください。
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Kinesis Data Firehose を使用している場合は、次の例のようにポリシーを更新してください。
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Kinesis Data Firehose 配信ストリーム設定の詳細については、「設定ファイルと認証情報ファイルの設定」を参照してください。

コネクタの設定

**注:**MSK からのメッセージを公開するように Kafka-Kinesis-Connector を設定できます。メッセージは次の宛先に公開できます。 Amazon Simple Storage Service (Amazon S3)、Amazon Redshift、または Amazon OpenSearch Service。

  1. Kinesis データストリームを設定する場合は、次の値を使用してコネクタを設定します。

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    別の種類のストリームを設定する場合は、Kinesis Data Firehose 配信ストリームのプロパティを次の値で構成します。

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. ********スタンドアロンモードまたは分散モードのワーカープロパティを設定します。

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Kafka-Kinesis-Connector のスタンドアロンモードまたは分散モードの詳細については、Apache ウェブサイトの Kafka Connect を参照してください。

  3. amazon-kinesis-kafka-connector-0.0.X.jar ファイルをディレクトリにコピーし、classpath をエクスポートします。
    **注:**amazon-kinesis-kafka-connector-0.0.X.jar ファイルを JAVA_HOME/lib/ext ディレクトリに追加することもできます。

  4. Kafka-Kinesis-Connector を実行するには、以下のコマンド構文を使用します。

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

関連情報

Amazon MSK クラスターの作成

AWS公式
AWS公式更新しました 10ヶ月前
コメントはありません

関連するコンテンツ