如何防止 Amazon SQS 訊息多次叫用 Lambda 函數?
我將 AWS Lambda 函數設定為在 Amazon Simple Queue Service (Amazon SQS) 佇列中處理訊息。現在,我的一些有效的 Amazon SQS 訊息被多次接收到到 maxReceiveCount。如何停止針對相同 Amazon SQS 訊息的重複 Lambda 函數叫用?
簡短描述
Lambda 支援 at-least-once 訊息傳遞。在某些情況下,重試機制可能會重複傳送同一訊息。然後,Amazon SQS 會將訊息傳送至您的無效字母佇列 (如果您已設定)。
由於下列其中一個原因,可能會針對相同的 Amazon SQS 訊息重複叫用 Lambda:
- 您的函數傳回錯誤或逾時。
- 成功完成批次處理後,在可見性逾時到期之前,Lambda 服務無法從 Amazon SQS 佇列刪除訊息。
- Lambda 服務傳送事件至函數,但未能從函數接收確認。
- 間斷性問題導致 Amazon SQS 傳回相同訊息,並由 Lambda 服務再次輪詢該訊息。
- 批次處理時段和函數持續時間的總和長於 Amazon SQS 佇列可見性逾時。SQS 可見性逾時必須至少是函數逾時和批次處理時段逾時總和的六倍。
若要解決這些問題,請在 Lambda 函數的 SQS 觸發程序中開啟報告批次處理項目失敗。然後,建立模組化函數程式碼,反覆執行批次處理、處理和刪除成功和重複訊息。該函數將成功訊息的 messageID 存放在 Amazon DynamoDB 資料表中,然後驗證是否提前處理該訊息。
**重要事項:**下列解決方案會對每個傳入訊息傳送多個請求至 DynamoDB,以減緩處理時間。這也會導致 API 呼叫的成本更高。因此,請務必為您的專案考慮此解決方案的時間和成本。若重複 Lambda 呼叫的錯誤率很低,則此解決方案的時間和成本可能會超過效益。
解決方式
首先,通過檢查訊息 ID來確認您多次收到相同的訊息。若因先前列出的任何原因而收到相同訊息的多個副本,則該訊息具有相同的 ID。於此狀況下,請依照下列步驟作業。若您收到多個具有相同內容但不同訊息 ID 的訊息,則該訊息會多次傳送至佇列。於此狀況下,請檢查您的訊息產生者。
下列步驟僅適用於 Lambda 函數的 SQS 觸發程序。其不適用於手動提取請求。
建立 DynamoDB 資料表
下列 DynamoDB 資料表會保留您的訊息 ID,則 Lambda 函數可加以比較以進行複製。
- 開啟 DynamoDB 主控台。
- 選擇 Create Table (建立資料表)。
- 在 Create DynamoDB table (建立 DynamoDB 資料表) 螢幕中,設定以下值:
對於 Table (資料表),輸入 ProcessedRecords
在 Primary key (主索引鍵) 下,對於 Partition key (分區索引鍵),輸入 Records (記錄)
將資料類型設定為 String (字串) - 根據您的使用案例視需輸入其他設定。然後選擇 Create (建立)。
建立 Lambda 函數
重要提示︰Lambda 函數程式必須為冪等。如需等冪性最佳實務和範例函數邏輯,請參閱如何將我的 Lambda 函數設為等冪?
在您建立 DynamoDB 資料表之後,建立一個 Lambda 函數。此函數會將傳入訊息與先前成功,然後保留於 DynamoDB 資料表中的訊息進行比較。若訊息先前已成功,則函數不允許處理重複項目。若新且唯一的訊息成功,則會將其傳送至資料表以供未來比較。失敗的訊息會重試,直至成功處理完畢,或是訊息的ReceiveCount 超過 maxReceiveCount 為止。
在函數中:
- 新增執行角色以允許 dynamodb:Query 和 dynamodb:PutItem 動作。
- 在 SQS 觸發程序中報告批次處理項目失敗,以識別和略過批次處理中的重複訊息。
在以下範例中,函數邏輯將訊息正文轉換為大寫。在 process_message(...) 方法下編寫:
import boto3 dynamodb_client = boto3.client('dynamodb') DDB_TABLE = 'ProcessedRecords' # Validates if the message is already processed in previous invokes. # @input string message_id # # @param message_id used to query the message from DynamoDB # @return Boolean def is_duplicate_message(message_id): return dynamodb_client.query( TableName='ProcessedRecords', Select='COUNT', KeyConditionExpression='Records = :Records', ExpressionAttributeValues={ ':Records': {'S': message_id} } )["Count"] != 0 # Processes the message body to upper case. # @input string body # # @param body to be processed # @return uppercase body def process_message(body): return body.upper() # Put the message to the DynamoDB Table. # @input string batch_item_success # # @param batch_item_success of the message to put. # @return Boolean def push_to_dynamoDB(batch_item_success): for message_id in batch_item_success: response = dynamodb_client.put_item( TableName = DDB_TABLE, Item={ 'Records': {'S':message_id} } ) return True # processor function iterating through messages in the event. # @input dict Records # # @param Records to be processed # @return Boolean def iterate_records(Records): batch_item_failures = [] batch_item_success = [] for record in Records: message_id = record["messageId"] print("Message: " + message_id) if is_duplicate_message(message_id): print("Message duplicate: " + message_id) continue try: process_message(record["body"]) batch_item_success.append(message_id) except: batch_item_failures.append({"itemIdentifier": message_id}) push_to_dynamoDB(batch_item_success) return batch_item_failures def lambda_handler(event, context): return iterate_records(event["Records"])
相關資訊
為什麼我的 Lambda 函數重試有效的 Amazon SQS 訊息,並將它們放在我的無效字母佇列中?
相關內容
- 已提問 1 年前lg...
- 已提問 9 個月前lg...
- 已提問 7 天前lg...
- AWS 官方已更新 3 年前
- AWS 官方已更新 3 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 2 年前