How do I filter events with EventBridge Pipes?

5 minute read
0

I want to use Amazon EventBridge pipes to capture a specific event from AWS sources and connect it to targets.

Short description

Amazon EventBridge Pipes receives event data from a variety of sources:

  • Amazon DynamoDB stream
  • Amazon Kinesis stream
  • Amazon MQ
  • Amazon Managed Streaming for Apache Kafka (Amazon MSK) stream
  • Self-managed Apache Kafka stream
  • Amazon Simple Queue Service (Amazon SQS) queue

Sending data events from their source to the target generates resource overhead and incurs additional costs. The target must process all data events and implement the business logic to discard those that aren't required for processing. When this happens, it's a best practice to send only specific data events to the target. Doing so filters out certain data events originating from the supported sources.
Note: You have the option to enhance or enrich the filtered events before sending them to the target. Use an AWS service such as API destination, API Gateway, AWS Lambda, or AWS Step Functions to enrich the events.

Resolution

Filtering in pipes is similar to filtering in an EventBridge event bus. Both use event patterns. Every EventBridge Pipes source has one or more fields that contain the core message or data. These fields are also known as message fields or data fields.
Note: These sections don't address the non-native feature of EventBridge Pipes.

Use the filter criteria to reduce overhead

Pipes use this filter criteria to filter events sent by the source:

Metadata property: This is metadata information (JSON format) about the source that generated the data event.

Data property: This is the actual message body of the data event from the source. The format of this message body varies from source to source.

Example of a message body (in JSON format) sent from an SQS queue

{
    "key": "example-key-1",
    "tag": "eb-pipe-filter",
    "object name": "sampleimage-11.png",
    "sequencer": "617f0837b476e463",
    "Order Status": "Approved"
}

Example of an event received by the pipe

{
    "messageId": "c9652226-2d8f-49e9-9f06-beeb2a6f55ce",
    "receiptHandle": "AQEBoteuDChpmsb6765tA//hqtPta1W/utSYlmWuw3fjZnVvY+dxOUdj4cEjYZtLKGMGT4LUuvWbQPHb96Wwa+X0zUJo8ZiHC7dbul62hVbFTXjB7+cOKH91RXc3YMllpnd6nB3CNAOGIig6suCpEOGcrhTJ6dC45KoacROua1PIH454Ji8AP51TkQSnhkkeOJHInLHkvrd2sfoBo037kmMKJSo3kDgU7y92jbpJphRmijnNQX6gPQYSVifF3n3ApvSBMGQIUzQb65ZRLl6Mp2VPJQYF2RRjYShdrzhe3uON9H01m9f3LGcpL3yaX6yqFahfoEe3PiT6MMXHvIAaj+dKVSUqmnnk3n7X0n4WX1uFPgyjrI5KuJ64j2lf/dkcPMQEUPorjmagECmYpMrGlkBbZw==",
    "body": {
        "key": "example-key-1",
        "tag": "eb-pipe-filter",
        "object name": "sampleimage-11.png",
        "sequencer": "617f0837b476e463",
        "Order Status": "Approved"
    },
    "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1690589578649",
        "SenderId": "AIDAIMWXWRRSWVPYSJEH6",
        "ApproximateFirstReceiveTimestamp": "1690589578654"
    },
    "messageAttributes": {
        "Name": {
            "stringValue": "TestMessage",
            "stringListValues": [],
            "binaryListValues": [],
            "dataType": "String"
        }
    },
    "md5OfMessageAttributes": "62bd1c2ea8c5344caa6852a77543892d",
    "md5OfBody": "388e8d9be1776f61c0be0a089807afbc",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:ap-southeast-2:123456789012:test-sqs-source-pipe",
    "awsRegion": "ap-southeast-2"
}

Note: In this example event received by the pipe, eventSource, eventSourceARN, and awsRegion can't be used for creating an event pattern because they are added during polling.

Examples of event filter patterns

Example of an event filter pattern match with metadata properties

This event filter pattern matches events received by the pipe whose metadata properties are "attributes" and "stringValue", with their respective values.

{
  "attributes": {
    "ApproximateReceiveCount": ["1"]
  },
  "messageAttributes": {
    "Name": {
      "stringValue": ["TestMessage"]
    }
  }
}

Example of an event filter pattern match with data properties

This event filter pattern matches events received by the pipe whose data property is "Order Status" and  value is "Approved".

{
  "body": {
    "Order Status": ["Approved"]
  }
}

Example of an event filter pattern match with metadata and data properties

This event filter pattern matches events received by the pipe with the "Order Status" as data property and "SenderId" as metadata property, with their respective values.

{
  "attributes": {
    "SenderId": ["AIDAIMWXWRRSWVPYSJEH6"]
  },
  "body": {
    "Order Status": ["Approved"]
  }
}

Example of an event filter pattern with no matches

This event filter pattern can't match events received by the pipe because it contains the field "awsRegion." This field was added while polling, and so the match failed despite having the matching data property.

{
  "body": {
    "Order Status": ["Approved"]
  },
  "awsRegion": ["ap-southeast-2"]
}

Criteria for a successful match

EventBridge Pipes drop the message if the incoming message body and event filter pattern aren't configured correctly.

When you create an event filter pattern for the message body of the event, make sure that the pattern matches the format used by the sources. For example, message body for SQS can be either a plain string or a JSON script. Amazon Kinesis stream and DynamoDB records must be in a valid JSON format. Self-managed Apache Kafka stream and Amazon MQ message use UTF-8 encoded strings, either as plain strings or in a JSON format.

For more information on the different sources and their filter patterns, see Amazon EventBridge Pipes filtering.

Test the filter's event pattern

  1. Open the Amazon EventBridge console.
  2. In the navigation pane, choose Pipes.
  3. Choose Create pipe and then enter a name for the pipe. (Optional: Add a description for the pipe.)
  4. On the Build pipe tab, choose the type of source that you want to specify for this pipe. Configure the source by selecting it from the dropdown list.
  5. Enter the necessary parameters for Source. Choose Next.
  6. Under Sample event - optional, select either AWS events (to show Sample events) or Enter my own to enter your own event.
  7. Under Event pattern, enter the required event pattern to filter the events. Then, choose Test Pattern or Next to navigate to Enrichment (optional).
  8. Choose Next to navigate to Target.
  9. Under Details, for Target service, choose the target from the dropdown list. Enter the necessary target resource information specific to the selected target type.
  10. Configure the pipe setting (such as permission, dead-letter queue, tags, and so on).
  11. Choose Create pipe.

Related information

Properly filtering Amazon SQS messages

Properly filtering Amazon Managed Streaming for Apache Kafka, self managed Apache Kafka, and Amazon MQ messages

AWS OFFICIAL
AWS OFFICIALUpdated 7 months ago