Ongoing service disruptions
For the most recent update on ongoing service disruptions affecting the AWS Middle East (UAE) Region (ME-CENTRAL-1), refer to the AWS Health Dashboard. For information on AWS Service migration, see How do I migrate my services to another region?
MSK Connect で実行されている MirrorMaker 2 を使用して、複数のアカウントの Amazon MSK クラスター間でデータを転送する方法を教えてください。
複数の AWS アカウントの Amazon Managed Streaming for Apache Kafka (Amazon MSK) クラスター間でデータを転送するために、MSK Connect で実行される MirrorMaker 2.0 (MM2) を使用したいと考えています。
解決方法
VPC ピアリングをセットアップする
Amazon MSK クラスターは複数の仮想プライベートクラウド (VPC) にあるため、VPC ピアリング接続を作成する必要があります。
ターゲットの Amazon MSK クラスターに関連付けられているセキュリティグループは、ターゲットクラスターのセキュリティグループからのすべてのトラフィックを許可する必要があります。ターゲットクラスターのセキュリティグループが、ソースの Amazon MSK クラスターのセキュリティグループからのすべてのトラフィックを許可することも必要です。詳細については、「セキュリティグループの更新とピアセキュリティグループの参照」を参照してください。
注:別のアカウントのセキュリティグループを参照するには、[送信元] または [送信先] フィールドにアカウント番号を含めてください。
MM2 プラグイン情報を使用してプラグインを作成する
注: MSK Connect カスタムプラグインは、.jar または .zip で終わるファイルまたはフォルダーを受け入れます。
次の手順を実行します。
-
テストフォルダを作成してフォルダを圧縮するには、次のコマンドを実行します。
mkdir mm2 zip mm2.zip mm2 -
.zip オブジェクトをターゲットアカウントの Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。
aws s3 cp mm2.zip s3://mytestbucket/注: s3://mytestbucket/ を ご使用の Amazon S3 バケットの URL に置き換えてください。
Apache Kafka と MSK Connect には MirrorMaker ライブラリが組み込まれているため、この機能のために追加の.jar ファイルを追加する必要はありません。 -
ターゲットアカウントで、.zip ファイルを使用してカスタムプラグイン を作成し、そのファイルに mm2-connect-plugin という名前を付けます。
注: MSK Connect を使用する場合、コネクタの作成時にカスタムプラグインが必要です。
ターゲットアカウントに MSK Connect コネクタを作成する
次の手順を実行します。
-
Amazon MSK コンソールを開きます。
-
ナビゲーションペインの [MSK Connect] で、[コネクタ] を選択します。
-
[コネクタを作成] を選択します。
-
[既存のカスタムプラグインを使用] を選択します。
-
[カスタムプラグイン] で、作成したカスタムプラグインを選択し、[次へ] を選択します。
-
コネクタの名前を入力します。
(オプション) 説明を入力します。 -
クラスターのリストから、ターゲットクラスターを選択します。
-
次の例のような構成をコネクタ構成フィールドに追加します。
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max=1 clusters=primary,replica source.cluster.alias=primary target.cluster.alias=replica topics=example.* replication.factor=2 topic.creation.default.replication.factor=2 topic.creation.default.partitions=2 consumer.group.id=mm2-connector refresh.groups.interval.seconds=20 refresh.topics.interval.seconds=20 sync.topic.configs.interval.seconds=20 sync.topic.acls.interval.seconds=20 producer.enable.idempotence=true transforms=renameTopic transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter transforms.renameTopic.regex=primary.(.*) transforms.renameTopic.replacement=$1 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter # Source cluster options source.cluster.bootstrap.servers= source.cluster.security.protocol=PLAINTEXT # Target cluster options target.cluster.bootstrap.servers= target.cluster.security.protocol=PLAINTEXT -
コネクタの容量を設定します。
-
[ワーカー設定] で **[MSK のデフォルト設定を使用]**を選択します。
-
[アクセス許可] で、MSK Connect に必要な権限を付与する AWS Identity and Access Management (IAM) ロールを選択します。完了後、[次へ] を選択します。
-
[セキュリティ] ページの [暗号化 - 転送時] で、[プレーンテキストトラフィック]を選択します。完了後、[次へ] を選択します。
-
オプションで、[ログ] ページでログ配信を設定します。完了後、[次へ] を選択します。
-
[確認と作成] で **[コネクタを作成]**を選択します。
注: この設定では、ソースクラスターから各トピックを複製するために、MM2 はターゲットクラスターに 2 つのトピックを作成します。例えば、ソースクラスターに exampleTopic1 というトピックがある場合、MM2 はターゲットクラスターに primary.exampleTopic1 と exampleTopic1 というトピックを作成します。次に、MM2 はメッセージを ExampleTopic1 トピックにルーティングします。
クライアントインスタンスを作成する
トピックを作成し、トピックからデータを生成または消費するためには、クライアントインスタンスを作成する必要があります。
次の手順を実行します。
-
Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを起動し、そのインスタンスに接続します。
-
次のコマンドを実行して、クライアントマシンに Java をインストールします。
sudo yum -y install java-11 -
次のコマンドを実行して Apache Kafka をダウンロードします。
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -xzf kafka_2.12-2.8.1.tgz -
ソースアカウントの Amazon MSK クラスター内にトピック exampletopic1 を作成します。
kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。SourceMSKClusterBootstrapServerString を、ソース Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
-
ソースアカウントのクラスター内にデータを生成します。
Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。SourceMSKClusterBootstrapServerString を、ソース Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
想定される出力:>message 1 >message 2 -
ターゲットアカウントのクラスター内のトピックを一覧表示します。
Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。TargetMSKClusterBootstrapServerString を、ターゲット Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
出力例:__amazon_msk_canary __amazon_msk_connect_configs_mm2-***** __amazon_msk_connect_offsets_mm2-***** __amazon_msk_connect_status_mm2-***** __consumer_offsets exampleTopic1 primary.exampleTopic1 -
ターゲットクラスターのデータを消費します。
Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning注: kafka-installlation-path は、システムに Kafka がインストールされているパスに置き換えてください。TargetMSKClusterBootstrapServerString を、ターゲット Amazon MSK クラスターのブートストラップサーバー文字列に置き換えてください。
想定される出力:>message 1 >message 2
関連情報
関連するコンテンツ
- 質問済み 1年前
