Amazon SQS 메시지가 내 Lambda 함수를 두 번 이상 호출하지 못하게 하려면 어떻게 해야 하나요?

4분 분량
0

Amazon Simple Queue Service(Amazon SQS) 대기열의 메시지를 처리하도록 AWS Lambda 함수를 구성했습니다. 이제 유효한 Amazon SQS 메시지 중 일부가 maxReceiveCount까지 여러 번 수신됩니다. 동일한 Amazon SQS 메시지에 대한 중복 Lambda 함수 호출을 중지하려면 어떻게 해야 하나요?

간략한 설명

Lambda는 최소 1회 메시지 전송을 지원합니다. 경우에 따라 재시도 메커니즘이 동일한 메시지를 중복으로 보낼 수 있습니다. 그러면 Amazon SQS는 메시지를 배달 못한 편지 대기열로 보냅니다(사용자가 구성한 경우).

동일한 Amazon SQS 메시지에 대한 중복 Lambda 호출은 다음 이유 중 하나로 인해 발생할 수 있습니다.

  • 함수가 오류를 반환하거나 시간 초과되었습니다.
  • 가시성 시간 제한이 만료되기 전에 배치가 성공한 후 Lambda 서비스가 Amazon SQS 대기열에서 메시지를 삭제하지 못합니다.
  • Lambda 서비스가 이벤트를 함수에 전송했지만 함수로부터 승인을 받지 못했습니다.
  • 간헐적인 문제로 인해 Amazon SQS가 동일한 메시지를 반환하고 Lambda 서비스가 이를 다시 폴링했습니다.
  • 배치 기간과 함수 지속 시간의 합계가 Amazon SQS 대기열 가시성 시간 제한보다 깁니다. SQS 가시성 시간 제한은 함수 시간 제한과 배치 기간 시간 제한 합계의 6배 이상이어야 합니다.

이러한 문제를 해결하려면 Lambda 함수의 SQS 트리거에서 배치 항목 실패 보고(Report Batch item failure)를 켭니다. 그런 다음 배치를 반복하여 성공 메시지와 중복 메시지를 처리하고 삭제하는 모듈형 함수 코드를 생성합니다. 이 함수는 성공 메시지의 messageID를 Amazon DynamoDB 테이블에 저장한 다음 메시지가 이전에 처리되었는지 확인합니다.

중요: 다음 해결 방법은 수신되는 각 메시지에 대해 DynamoDB에 요청을 여러 개 보내므로 처리 시간이 느려집니다. 이로 인해 API 호출 비용도 높아집니다. 따라서 프로젝트에서 이 문제를 해결하는 데 드는 시간과 비용을 고려해야 합니다. 중복된 Lambda 호출의 오류율이 낮으면 이 해결 방법에 드는 시간과 비용이 이점보다 클 수 있습니다.

해결 방법

먼저 메시지 ID를 확인하여 동일한 메시지를 여러 번 받고 있는지 확인합니다. 앞에서 설명한 이유 중 하나로 인해 동일한 메시지의 복사본을 여러 개 받는 경우 메시지의 ID가 동일합니다. 이런 경우에는 아래 단계에 따라 다음을 수행합니다. 콘텐츠는 같지만 메시지 ID가 다른 메시지를 여러 개 받는 경우 메시지가 대기열에 두 번 이상 전송되고 있는 것입니다. 이런 경우에는 메시지 생산자를 확인합니다.

다음 단계는 Lambda 함수의 SQS 트리거에만 적용됩니다. 수동 풀 요청에는 작동하지 않습니다.

DynamoDB 테이블 생성

다음 DynamoDB 테이블에는 메시지 ID가 보관되어 Lambda 함수가 메시지 ID를 비교한 후 중복 여부를 확인할 수 있습니다.

  1. DynamoDB 콘솔을 엽니다.
  2. **테이블 생성(Create table)**을 선택합니다.
  3. DynamoDB 테이블 생성(Create DynamoDB table) 화면에서 다음과 같은 값을 설정합니다.
    **테이블(Table)**에 ProcessedRecords 입력
    기본 키(Primary key) 아래의 **파티션 키(Partition key)**에 Records 입력
    데이터 유형을 String으로 설정
  4. 사용 사례에 필요한 다른 설정을 입력합니다. 그런 다음 **생성(Create)**을 선택합니다.

Lambda 함수 생성

중요: Lambda 함수 코드는 멱등성이어야 합니다. 멱등성 모범 사례 및 예제 함수 논리에 대한 자세한 내용은 Lambda 함수를 멱등성으로 만들려면 어떻게 해야 합니까?를 참조하세요.

DynamoDB 테이블을 생성한 후 Lambda 함수를 생성합니다. 이 함수는 수신 메시지를 이전에 성공하여 DynamoDB 테이블에 보관된 메시지와 비교합니다. 메시지가 이전에 성공한 경우 함수는 중복 처리를 허용하지 않습니다. 고유한 새 메시지가 성공하면 나중에 비교할 수 있도록 테이블로 전송됩니다. 실패한 메시지는 성공적으로 처리될 때까지 또는 메시지의 ReceiveCountmaxReceiveCount를 초과할 때까지 다시 시도됩니다.

함수에서 다음을 수행합니다.

다음 예제에서는 함수 논리가 메시지 본문을 대문자로 변환합니다. process_message(...) 메서드 아래에 작성됩니다.

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

관련 정보

Lambda 함수가 유효한 Amazon SQS 메시지를 다시 시도하여 전송 실패 대기열에 배치하는 이유는 무엇입니까?


AWS 공식
AWS 공식업데이트됨 일 년 전