I want to use Amazon EventBridge Pipes to capture an event from AWS sources and connect the event to targets.
Short description
EventBridge Pipes receives event data from the following sources:
- Amazon DynamoDB stream
- Amazon Kinesis stream
- Amazon MQ
- Amazon Managed Streaming for Apache Kafka (Amazon MSK) stream
- Self-managed Apache Kafka stream
- Amazon Simple Queue Service (Amazon SQS) queue
If you send data events from their source to the target, then you generate resource overhead and incur additional costs. The target must process all data events and implement the business logic to discard unnecessary events. It's a best practice to send only specific data events to the target.
To enhance the filtered events before you send them to the target, use an API destination or the following AWS services:
- Amazon API Gateway
- AWS Lambda
- AWS Step Functions
Resolution
Note: EventBridge Pipes uses event patterns. Every EventBridge Pipes source has fields that contain the core message or data.
Use the filter criteria to reduce overhead
To filter events that the source sent, EventBridge Pipes uses the following filter criteria:
- The Metadata property includes information in JSON format about the source that generates the data event.
- The Data property includes the message body of the data event from the source. The format of the message body varies from source to source.
Example message body that an Amazon SQS queue sent:
{ "key": "example-key-1",
"tag": "eb-pipe-filter",
"object name": "sampleimage-11.png",
"sequencer": "617f0837b476e463",
"Order Status": "Approved"
}
Example event that the pipe received:
{ "messageId": "c9652226-2d8f-49e9-9f06-beeb2a6f55ce",
"receiptHandle": "AQEBoteuDChpmsb6765tA//hqtPta1W/utSYlmWuw3fjZnVvY+dxOUdj4cEjYZtLKGMGT4LUuvWbQPHb96Wwa+X0zUJo8ZiHC7dbul62hVbFTXjB7+cOKH91RXc3YMllpnd6nB3CNAOGIig6suCpEOGcrhTJ6dC45KoacROua1PIH454Ji8AP51TkQSnhkkeOJHInLHkvrd2sfoBo037kmMKJSo3kDgU7y92jbpJphRmijnNQX6gPQYSVifF3n3ApvSBMGQIUzQb65ZRLl6Mp2VPJQYF2RRjYShdrzhe3uON9H01m9f3LGcpL3yaX6yqFahfoEe3PiT6MMXHvIAaj+dKVSUqmnnk3n7X0n4WX1uFPgyjrI5KuJ64j2lf/dkcPMQEUPorjmagECmYpMrGlkBbZw==",
"body": {
"key": "example-key-1",
"tag": "eb-pipe-filter",
"object name": "sampleimage-11.png",
"sequencer": "617f0837b476e463",
"Order Status": "Approved"
},
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1690589578649",
"SenderId": "AIDAIMWXWRRSWVPYSJEH6",
"ApproximateFirstReceiveTimestamp": "1690589578654"
},
"messageAttributes": {
"Name": {
"stringValue": "TestMessage",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String"
}
},
"md5OfMessageAttributes": "62bd1c2ea8c5344caa6852a77543892d",
"md5OfBody": "388e8d9be1776f61c0be0a089807afbc",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:ap-southeast-2:123456789012:test-sqs-source-pipe",
"awsRegion": "ap-southeast-2"
}
Note: In the preceding example event, you can't use eventSource, eventSourceARN, and awsRegion to create an event pattern because EventBridge adds the data during polling.
Review examples of event filter patterns
Event filter pattern match with metadata properties
The following event filter pattern matches events that the pipe received with the attributes and stringValue metadata properties:
{ "attributes": {
"ApproximateReceiveCount": ["1"]
},
"messageAttributes": {
"Name": {
"stringValue": ["TestMessage"]
}
}
}
Event filter pattern match with data properties
The following event filter pattern matches events that the pipe received with the Order Status data property and Approved value:
{ "body": {
"Order Status": ["Approved"]
}
}
Event filter pattern match with metadata and data properties
The following event filter pattern matches events that the pipe received with the Order Status data property and SenderId metadata property:
{ "attributes": {
"SenderId": ["AIDAIMWXWRRSWVPYSJEH6"]
},
"body": {
"Order Status": ["Approved"]
}
}
Event filter pattern with no matches
The following event filter pattern can't match events that the pipe received because it contains the field awsRegion field:
{ "body": {
"Order Status": ["Approved"]
},
"awsRegion": ["ap-southeast-2"]
}
Note: EventBridge adds the awsRegion field when it adds the data during polling. Although the data property fields matched, the event filter pattern failed.
Review the criteria for a successful match
If you don't correctly configure the incoming message body and event filter pattern, then EventBridge Pipes drops the message.
When you create an event filter pattern for the message body of the event, the pattern must match the format that the sources use. For example, the message body for SQS can be either a plain string or a JSON script. Kinesis stream and DynamoDB records must be in a valid JSON format. Self-managed Apache Kafka streams and Amazon MQ messages use UTF-8 encoded strings, either as plain strings or in a JSON format.
For more information about the sources and their filter patterns, see Event filtering in Amazon EventBridge Pipes.
Create an EventBridge pipe
Complete the following steps:
- Open the EventBridge console.
- In the navigation pane, choose Pipes.
- Choose Create pipe, and then enter a name for the pipe.
- (Optional) For Description, add a description for the pipe.
- On the Build pipe tab, select the source type, and then configure the source.
- For Source, enter the parameters, and then choose Next.
- Under Sample event type, select either AWS events or Enter my own.
- Under Event pattern, enter the required event pattern to filter the events.
- (Optional) Choose Test pattern.
- (Optional) Choose Next to navigate to Enrichment.
- Choose Next to navigate to Target.
- Under Details, for Target service, select the target from the dropdown list, and then enter the target resource information.
- Configure the pipe settings.
- Choose Create pipe.
Related information
Properly filtering Amazon SQS messages
Properly filtering Amazon Managed Streaming for Apache Kafka, self managed Apache Kafka, and Amazon MQ messages