How do I deploy EventBridge Pipes with Amazon MSK?

4 minute read
0

I want to use EventBridge Pipes to receive records from an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic.

Short description

Amazon MSK is a fully managed streaming service. It allows AWS customers to build and run applications that use Apache Kafka to process streaming data. AWS customers using Amazon MSK can integrate directly with Amazon EventBridge Pipes. The integration ingests and processes Kafka messages before passing them to targets.

For more information on Amazon MSK and steps on creating a cluster, see Getting started using Amazon MSK.

Resolution

Prerequisites

Before you begin integrating the two services, note the following:

  • Authentication: EventBridge needs permission to access the Amazon MSK cluster, retrieve records, and perform other tasks.
  • Network Configuration: EventBridge must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your Amazon MSK cluster.
  • Amazon MSK execution role permissions: EventBridge requires certain permissions to manage resources that are related to your Amazon MSK topic.

Deploy EventBridge Pipes with Amazon MSK

Follow these steps to deploy EventBridge Pipes with Amazon MSK as the source:

  1. Navigate to the EventBridge console in your respective AWS Region.
  2. In the left navigation panel, choose Pipes.
  3. Choose Create pipe.
  4. Enter a name for the pipe. (Optional: Enter a description for the pipe.)

Select Amazon MSK as the source for the pipe

  1. In the Select source dropdown list, select Amazon MSK.
  2. Select the Amazon MSK cluster that you created previously.
  3. Enter the name of the Kafka topic used to store records in your Kafka cluster.
  4. (Optional) Provide the ID of a Kafka consumer group to join. For more information, see the section How EventBridge chooses a bootstrap broker in Amazon Managed Streaming for Apache Kafka topic as a source.
  5. (Optional) Choose the authentication method and secret key, if specified. For more information, see Amazon MSK cluster authentication.

Apply additional settings

  1. (Optional) Configure these additional settings:
    • Batch size (optional).
    • Batch window (optional).
    • Starting position. (Latest is the default setting. Change to Trim horizon, if required.)
      • Latest - Start reading the topic with the most recent record in the shard.
      • Trim horizon - Start reading the topic with the last untrimmed record in the shard. This is the oldest record in the shard.
        Note: Trim horizon is the same as Earliest for Apache Kafka.
  2. After you configure the source, apply these settings:
  • (Optional) Filtering (apply to the source's events and process only the subset of filtered events).
  • (Optional) Enrichment (enhance the data from the source before sending it to the target).
  1. Select your target from the Target service list. (Optionally, define an input transformer).
  2. Confirm that the pipe configuration is correct. Then, choose Create pipe.
  3. When the pipe transitions to Running Status, EventBridge internally polls for new messages from the source. After that, it synchronously invokes the target. The event payload contains an array of messages. Each array item contains details of the Apache Kafka topic and Apache Kafka partition identifier, together with a timestamp and a base64-encoded message.

Example: An Amazon MSK event

[
  {
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
    "eventSourceKey": "mytopic-0",
    "topic": "mytopic",
    "partition": "0",
    "offset": 15,
    "timestamp": 1545084650987,
    "timestampType": "CREATE_TIME",
    "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==",
    "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "headers": [
      {
        "headerKey": [
          104,
          101,
          97,
          100,
          101,
          114,
          86,
          97,
          108,
          117,
          101
        ]
      }
    ]
  }
]

Some common troubleshooting tips

  • Use AWS provided metrics through the Amazon CloudWatch Metrics console in your respective Region for the pipe to isolate activation, invocation, or even throttling-related issues.
  • Check the following invocation-related errors:
  • Pipe internal errors (caused by the pipes service):
    • An HTTP connection failure when attempting to invoke the customer target service.
    • A transient drop in availability on the pipe service itself.
  • Customer invocation errors:
    • Insufficient permissions on the pipe to invoke the target. For more information, see permissions.
    • Logic errors in your synchronously invoked customer AWS Lambda, AWS Step Functions, API destination, or API Gateway endpoint.
  • Creating, deleting, and updating pipes are asynchronous operations that can result in a failure state. The pipe StateReason provides information to help troubleshoot the failure.
  • When you invoke a pipe, you might get two main types of errors: pipe internal errors and customer invocation errors.
  • EventBridge Pipes doesn't support cross-account processing of an Amazon MQ Broker from a different account.

For more information, see Amazon EventBridge Pipes error handling and troubleshooting.

Related information

Creating an Amazon EventBridge pipe

Amazon Managed Streaming for Apache Kafka topic as a source

AWS OFFICIAL
AWS OFFICIALUpdated a year ago