如何使用Step Functions将具有未知数量消息属性的消息转发到SQS

0

【以下的问题经过翻译处理】 我需要能够使用 Event Bridge Pipe 和 Step Functions 将消息从现有 SQS 队列转发到新的 SQS 队列。新队列中的每条消息必须看起来与原始队列中的消息相同,包括属性。这是一个临时配置。消息最终将进入 lambdas 而不是新队列。 我的问题是消息没有一致的属性。它们可能具有 0 - 9 个属性,可以任意组合。 我发现在从 Step Functions 发送消息时包含属性的示例如下所示:

"Parameters": {
       "QueueUrl": "https://sqs.<region>.amazonaws.com/<account/<destination-queue>",
       "MessageBody.$": "$.body",
       "MessageAttributes": {
         "someAttribute": {
           "DataType.$": "$.messageAttributes.someAttribute.dataType",
           "StringValue.$": "$.messageAttributes.someAttribute.stringValue"
         },
         "someAttribute2": {
           "DataType.$": "$.messageAttributes.someAttribute2.dataType",
           "StringValue.$": "$.messageAttributes.someAttribute2.stringValue"
         }

这行得通,但是如果我指定了所有潜在属性并且传入消息缺少一些属性,我会收到如下错误: 为字段指定的 JSONPath... 在输入中找不到。 如果我尝试像这样使用 jsonPath 传递属性:

"MessageAttributes.$": "$.messageAttributes"

我得到:执行状态“SendMessage”时发生错误。参数...无法用于启动任务:[Step Functions 不支持字段“stringValue”] 我考虑过使用 Choice,但是可能的属性组合太多了。 在 Step Functions 中使用 SQS 发送消息时,是否有一种简单的方法可以包含原始 json 输入中的所有属性,而无需一次将它们拼写出来?

profile picture
专家
已提问 10 个月前73 查看次数
1 回答
0

【以下的回答经过翻译处理】 你通过JSONPath设置参数的方法看起来不错(我在下面附上了一个示例),所以 MessageAttributes 应该等于 messageAttributes。你能在执行日志中验证一下吗?

更新: 阅读了您的回复后,我认为问题在于属性区分大小写,您正在发送 stringValue 而不是 StringValue。不管怎样,我编码了一个步函数,从队列A中读取并将消息保留在队列B中。希望这能帮到你。

Step Function definition

{
  "Comment": "Example",
  "StartAt": "Receive Message From Queue A",
  "States": {
    "Receive Message From Queue A": {
      "Type": "Task",
      "Parameters": {
        "QueueUrl": "REPLACE_WITH_YOUR_SQS_URL",
        "MessageAttributeNames": [
          "All"
        ]
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "Next": "Map"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "Send Message To Queue B",
        "States": {
          "Send Message To Queue B": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sqs:sendMessage",
            "Parameters": {
              "QueueUrl": "REPLACE_WITH_YOUR_SQS_URL",
                "MessageBody.$": "$.Body",
                "MessageAttributes.$": "$.MessageAttributes"
            },
            "End": true
          }
        }
      },
      "ItemsPath": "$.Messages",
      "End": true
    }
  }
}

profile picture
专家
已回答 10 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则