スキップしてコンテンツを表示

MSK Connect で実行されている MirrorMaker 2 を使用して、複数のアカウントの Amazon MSK クラスター間でデータを転送する方法を教えてください。

所要時間2分
0

複数の AWS アカウントの Amazon Managed Streaming for Apache Kafka (Amazon MSK) クラスター間でデータを転送するために、MSK Connect で実行される MirrorMaker 2.0 (MM2) を使用したいと考えています。

解決方法

VPC ピアリングをセットアップする

Amazon MSK クラスターは複数の仮想プライベートクラウド (VPC) にあるため、VPC ピアリング接続を作成する必要があります。

ターゲットの Amazon MSK クラスターに関連付けられているセキュリティグループは、ターゲットクラスターのセキュリティグループからのすべてのトラフィックを許可する必要があります。ターゲットクラスターのセキュリティグループが、ソースの Amazon MSK クラスターのセキュリティグループからのすべてのトラフィックを許可することも必要です。詳細については、「セキュリティグループの更新とピアセキュリティグループの参照」を参照してください。

注:別のアカウントのセキュリティグループを参照するには、[送信元] または [送信先] フィールドにアカウント番号を含めてください。

MM2 プラグイン情報を使用してプラグインを作成する

注: MSK Connect カスタムプラグインは、.jar または .zip で終わるファイルまたはフォルダーを受け入れます。

次の手順を実行します。

  1. テストフォルダを作成してフォルダを圧縮するには、次のコマンドを実行します。

    mkdir mm2
    zip mm2.zip mm2
  2. .zip オブジェクトをターゲットアカウントの Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。

    aws s3 cp mm2.zip s3://mytestbucket/

    注: s3://mytestbucket/ を ご使用の Amazon S3 バケットの URL に置き換えてください。
    Apache Kafka と MSK Connect には MirrorMaker ライブラリが組み込まれているため、この機能のために追加の.jar ファイルを追加する必要はありません。

  3. ターゲットアカウントで、.zip ファイルを使用してカスタムプラグイン を作成し、そのファイルに mm2-connect-plugin という名前を付けます。
    注: MSK Connect を使用する場合、コネクタの作成時にカスタムプラグインが必要です。

ターゲットアカウントに MSK Connect コネクタを作成する

次の手順を実行します。

  1. Amazon MSK コンソールを開きます。

  2. ナビゲーションペインの [MSK Connect] で、[コネクタ] を選択します。

  3. [コネクタを作成] を選択します。

  4. [既存のカスタムプラグインを使用] を選択します。

  5. [カスタムプラグイン] で、作成したカスタムプラグインを選択し、[次へ] を選択します。

  6. コネクタの名前を入力します。
    (オプション) 説明を入力します。

  7. クラスターのリストから、ターゲットクラスターを選択します。

  8. 次の例のような構成をコネクタ構成フィールドに追加します。

    connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
    tasks.max=1  
    
    clusters=primary,replica  
    source.cluster.alias=primary  
    target.cluster.alias=replica  
    
    topics=example.*  
    replication.factor=2  
    topic.creation.default.replication.factor=2  
    topic.creation.default.partitions=2  
    consumer.group.id=mm2-connector  
    
    refresh.groups.interval.seconds=20  
    refresh.topics.interval.seconds=20  
    
    sync.topic.configs.interval.seconds=20  
    sync.topic.acls.interval.seconds=20  
    
    producer.enable.idempotence=true  
    
    transforms=renameTopic  
    transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
    transforms.renameTopic.regex=primary.(.*)  
    transforms.renameTopic.replacement=$1  
    
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    
    # Source cluster options  
    source.cluster.bootstrap.servers=  
    source.cluster.security.protocol=PLAINTEXT  
    
    # Target cluster options  
    target.cluster.bootstrap.servers=  
    target.cluster.security.protocol=PLAINTEXT
  9. コネクタの容量を設定します。

  10. [ワーカー設定] で **[MSK のデフォルト設定を使用]**を選択します。

  11. [アクセス許可] で、MSK Connect に必要な権限を付与する AWS Identity and Access Management (IAM) ロールを選択します。完了後、[次へ] を選択します。

  12. [セキュリティ] ページの [暗号化 - 転送時] で、[プレーンテキストトラフィック]を選択します。完了後、[次へ] を選択します。

  13. オプションで、[ログ] ページでログ配信を設定します。完了後、[次へ] を選択します。

  14. [確認と作成] で **[コネクタを作成]**を選択します。

注: この設定では、ソースクラスターから各トピックを複製するために、MM2 はターゲットクラスターに 2 つのトピックを作成します。例えば、ソースクラスターに exampleTopic1 というトピックがある場合、MM2 はターゲットクラスターに primary.exampleTopic1exampleTopic1 というトピックを作成します。次に、MM2 はメッセージを ExampleTopic1 トピックにルーティングします。

クライアントインスタンスを作成する

トピックを作成し、トピックからデータを生成または消費するためには、クライアントインスタンスを作成する必要があります。

次の手順を実行します。

  1. Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを起動し、そのインスタンスに接続します。

  2. 次のコマンドを実行して、クライアントマシンに Java をインストールします。

    sudo yum -y install java-11
  3. 次のコマンドを実行して Apache Kafka をダウンロードします。

    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
    tar -xzf kafka_2.12-2.8.1.tgz
  4. ソースアカウントの Amazon MSK クラスター内にトピック exampletopic1 を作成します。

    kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1

    注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。SourceMSKClusterBootstrapServerString を、ソース Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。

  5. ソースアカウントのクラスター内にデータを生成します。

    Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1

    注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。SourceMSKClusterBootstrapServerString を、ソース Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
    想定される出力:

    >message 1
    >message 2
  6. ターゲットアカウントのクラスター内のトピックを一覧表示します。

    Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list

    注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。TargetMSKClusterBootstrapServerString を、ターゲット Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
    出力例:

    __amazon_msk_canary  __amazon_msk_connect_configs_mm2-*****  
    __amazon_msk_connect_offsets_mm2-*****  
    __amazon_msk_connect_status_mm2-*****  
    __consumer_offsets  
    exampleTopic1  
    primary.exampleTopic1
  7. ターゲットクラスターのデータを消費します。

    Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning

    注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。TargetMSKClusterBootstrapServerString を、ターゲット Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
    想定される出力:

    >message 1
    >message 2

関連情報

Migrate to an Amazon MSK Cluster (Amazon MSK クラスターに移行する)

AWS公式更新しました 6ヶ月前
コメントはありません

関連するコンテンツ