跳至內容

如何使用在 MSK Connect 上執行的 MirrorMaker 2,以便在不同帳戶中的 Amazon MSK 叢集之間傳輸資料?

3 分的閱讀內容
0

我想使用在 MSK Connect 上執行的 MirrorMaker 2.0 (MM2),以便在不同 AWS 帳戶中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 叢集之間傳輸資料。

解決方法

建立 VPC 對等互連

當 Amazon MSK 叢集位於不同的 Amazon Virtual Private Cloud (Amazon VPC) 時,您必須建立 VPC 對等互連

與來源 Amazon MSK 叢集關聯的安全群組必須允許來自目標叢集安全群組的所有流量。目標叢集的安全群組也必須允許來自來源 Amazon MSK 叢集安全群組的所有流量。如需更多資訊,請參閱更新您的安全群組以參考對等安全群組

**注意:**若要在其他帳戶中參考安全群組,請在 Source (來源) 或 Destination (目的地) 欄位中包含帳戶號碼。

建立含 MM2 外掛程式資訊的外掛程式

**注意:**MSK Connect 自訂外掛程式接受 .jar 或 .zip 格式的檔案或資料夾。

請完成下列步驟:

  1. 若建立測試資料夾並壓縮該資料夾,請執行以下命令:

    mkdir mm2
    zip mm2.zip mm2
  2. 將 .zip 物件上傳至目標帳戶的 Amazon Simple Storage Service (Amazon S3) 儲存貯體:

    aws s3 cp mm2.zip s3://mytestbucket/

    **注意:**將 s3://mytestbucket/ 替換為您的 Amazon S3 儲存貯體網址。
    Apache Kafka 與 MSK Connect 內建 MirrorMaker 函式庫,因此您不需要為此功能新增額外的 .jar 檔案。

  3. 在目標帳戶中,使用 .zip 檔案建立自訂外掛程式,並將檔案命名為 mm2-connect-plugin
    **注意:**使用 MSK Connect 時,您必須在建立連接器時擁有自訂外掛程式。

在目標帳戶中建立 MSK Connect 連接器

請完成下列步驟:

  1. 開啟 Amazon MSK console (Amazon MSK 主控台)。

  2. 在導覽窗格中,於 MSK Connect 下,選擇 Connectors (連接器)。

  3. 選擇 Create connector (建立連接器)。

  4. 選擇 Use existing custom plugin (使用現有自訂外掛程式)。

  5. Custom plugins (自訂外掛程式) 中,選取您建立的自訂外掛程式,然後選擇 Next (下一步)。

  6. 輸入連接器名稱。
    (選用) 輸入描述。

  7. 從叢集清單中選擇目標叢集。

  8. 在連接器組態欄位中新增類似下列範例的組態:

    connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
    tasks.max=1  
    
    clusters=primary,replica  
    source.cluster.alias=primary  
    target.cluster.alias=replica  
    
    topics=example.*  
    replication.factor=2  
    topic.creation.default.replication.factor=2  
    topic.creation.default.partitions=2  
    consumer.group.id=mm2-connector  
    
    refresh.groups.interval.seconds=20  
    refresh.topics.interval.seconds=20  
    
    sync.topic.configs.interval.seconds=20  
    sync.topic.acls.interval.seconds=20  
    
    producer.enable.idempotence=true  
    
    transforms=renameTopic  
    transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
    transforms.renameTopic.regex=primary.(.*)  
    transforms.renameTopic.replacement=$1  
    
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    
    # Source cluster options  
    source.cluster.bootstrap.servers=  
    source.cluster.security.protocol=PLAINTEXT  
    
    # Target cluster options  
    target.cluster.bootstrap.servers=  
    target.cluster.security.protocol=PLAINTEXT
  9. 設定連接器容量。

  10. Worker configuration (工作者組態) 下,選擇 Use the MSK default configuration (使用 MSK 預設組態)。

  11. Access permissions (存取權限) 下,選擇提供 MSK Connect 所需權限的 AWS Identity and Access Management (IAM) 角色。然後,選擇 Next (下一步)。

  12. Security (安全性) 頁面中,於 Encryption - in transit (加密 - 傳輸中) 下,選擇 Plaintext traffic (純文字流量)。然後,選擇 Next (下一步)。

  13. (選用) 在 Logs (日誌) 頁面中,設定日誌傳送。然後,選擇 Next (下一步)。

  14. Review and create (檢閱並建立) 下,選擇 Create connector (建立連接器)。

**注意:**使用上述組態時,若要複寫來源叢集中的每個主題,MM2 會在目標叢集中建立兩個主題。舉例來說,如果來源叢集上有主題 exampleTopic1,則 MM2 會在目標叢集建立 primary.exampleTopic1exampleTopic1。然後 MM2 會將訊息路由至 exampleTopic1 主題。

建立用戶端執行個體

您必須建立用戶端執行個體,以建立主題並從主題產生或消耗資料。

請完成下列步驟:

  1. 啟動 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體,然後連線至該執行個體。

  2. 若要在用戶端機器上安裝 Java,請執行以下命令:

    sudo yum -y install java-11
  3. 若要下載 Apache Kafka,請執行以下命令:

    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
    tar -xzf kafka_2.12-2.8.1.tgz
  4. 在來源帳戶的 Amazon MSK 叢集中建立 exampletopic1 主題:

    kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1

    **注意:**將 kafka-installation-path 替換為您系統上 Kafka 的安裝位置。將 SourceMSKclusterBootstrapServerString 替換為來源 Amazon MSK 叢集的 Bootstrap 伺服器字串。

  5. 在來源帳戶叢集中產生資料:

    Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1

    **注意:**將 kafka-installation-path 替換為您系統上 Kafka 的安裝位置。將 SourceMSKclusterBootstrapServerString 替換為來源 Amazon MSK 叢集的 Bootstrap 伺服器字串。
    預期輸出:

    >message 1
    >message 2
  6. 在目標帳戶的叢集中列出主題:

    Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list

    **注意:**將 kafka-installation-path 替換為您系統上 Kafka 的安裝位置。將 TargetMSKclusterBootstrapServerString 替換為目標 Amazon MSK 叢集的 Bootstrap 伺服器字串。
    範例輸出:

    __amazon_msk_canary  __amazon_msk_connect_configs_mm2-*****  
    __amazon_msk_connect_offsets_mm2-*****  
    __amazon_msk_connect_status_mm2-*****  
    __consumer_offsets  
    exampleTopic1  
    primary.exampleTopic1
  7. 從目標叢集消耗資料:

    Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning

    **注意:**將 kafka-installation-path 替換為您系統上 Kafka 的安裝位置。將 TargetMSKclusterBootstrapServerString 替換為目標 Amazon MSK 叢集的 Bootstrap 伺服器字串。
    預期輸出:

    >message 1
    >message 2

相關資訊

遷移至 Amazon MSK 叢集

AWS 官方已更新 2 個月前