Comment puis-je empêcher un message Amazon SQS d'appeler ma fonction Lambda plusieurs fois ?
J'ai configuré ma fonction AWS Lambda pour traiter les messages dans une file d'attente Amazon Simple Queue Service (Amazon SQS). À présent, certains de mes messages Amazon SQS valides sont reçus plusieurs fois, jusqu'à la limite maxReceiveCount. Comment puis-je arrêter les appels de fonctions Lambda en double pour le même message Amazon SQS ?
Brève description
Lambda prend en charge la distribution des messages au moins une fois. Dans certains cas, le mécanisme de nouvelle tentative peut envoyer des doublons du même message. Amazon SQS envoie ensuite les messages à votre file d'attente de lettres mortes si elle est configurée.
Les appels Lambda en double pour le même message Amazon SQS peuvent intervenir pour l'une des raisons suivantes :
- Votre fonction renvoie une erreur ou un délai d'expiration.
- Le service Lambda ne parvient pas à supprimer un message de la file d'attente Amazon SQS à la suite d'un traitement en lot réussi avant l'expiration du délai de visibilité.
- Le service Lambda a envoyé l'événement à cette fonction, mais n'a pas reçu d'accusé de réception de celle-ci.
- Un problème intermittent a provoqué le renvoi du même message par Amazon SQS, qui a été à nouveau interrogé par le service Lambda.
- La somme de la fenêtre de lot et de la durée de la fonction est supérieure à votre délai de visibilité de la file d'attente Amazon SQS. Le délai de visibilité SQS doit être au moins six fois supérieur au délai total de la fonction et de celui de la fenêtre de lot.
Pour résoudre ces problèmes, activez l'option Signaler l'échec d'élément de lot dans le déclencheur SQS de votre fonction Lambda. Créez ensuite un code de fonction modulaire qui parcourt le lot, traite et supprime les messages reçus et les messages en double. La fonction stocke le messageID des messages reçus dans une table Amazon DynamoDB et vérifie ensuite que le message a été traité plus tôt.
Important : La résolution suivante réduit le temps de traitement en envoyant plusieurs demandes à DynamoDB pour chaque message entrant. Cela entraîne également une augmentation des coûts liés aux appels d'API. Par conséquent, assurez-vous de prendre en compte le temps et le coût de cette résolution pour votre projet. Si votre taux d'erreur pour les appels Lambda dupliqués est faible, le temps et le coût de cette résolution peuvent l'emporter sur les avantages.
Solution
Tout d'abord, confirmez que vous recevez le même message plusieurs fois en vérifiant l'identifiant du message. Si vous recevez plusieurs copies du même message pour l'une des raisons énumérées précédemment, les messages ont le même identifiant. Dans ce cas, suivez les étapes ci-dessous. Si vous recevez plusieurs messages avec le même contenu mais des ID de message différents, le message est envoyé à la file d'attente plusieurs fois. Dans ce cas, vérifiez le producteur de votre message.
Les étapes suivantes s'appliquent uniquement au déclencheur SQS d'une fonction Lambda. Ils ne fonctionnent pas pour les requêtes pull manuelles.
Créer une table DynamoDB
Le tableau DynamoDB suivant contient les identifiants de vos messages afin qu'une fonction Lambda puisse les comparer en vue de les dupliquer.
- Ouvrez la console DynamoDB.
- Sélectionnez Créer une table.
- Dans l'écran Créer une table DynamoDB, définissez les valeurs suivantes :
Pour Table, saisissez ProcessedRecords
Sous Clé primaire, pour Clé de partition, saisissez Registres
Définissez le type de données sur String - Saisissez d'autres paramètres selon les besoins de votre cas d'utilisation. Ensuite, choisissez Créer.
Créer une fonction Lambda
Important : le code de la fonction Lambda doit être idempotent. Pour connaître les bonnes pratiques en matière d'idempotence et accéder à un exemple de logique de fonction, veuillez consulter la section Comment rendre ma fonction Lambda idempotente ?
Après avoir créé une table DynamoDB, créez une fonction Lambda. Cette fonction compare les messages entrants avec les messages qui étaient auparavant réussis puis conservés dans votre table DynamoDB. Si un message a déjà été envoyé avec succès, la fonction n'autorise pas le traitement des doublons. Si de nouveaux messages uniques sont réussis, ils sont envoyés au tableau pour une comparaison ultérieure. Les messages qui ont échoué sont réessayés jusqu'à ce qu'ils soient traités avec succès, ou jusqu'à ce que le ReceiveCount d'un message dépasse le maxReceiveCount.
Dans votre fonction :
- Ajoutez un rôle d'exécution pour autoriser les actions dynamodb:Query et dynamodb:PutItem.
- Signalez les échecs d'éléments de lot dans le déclencheur SQS pour identifier et ignorer les messages en double dans le lot.
Dans l'exemple suivant, la logique de fonction convertit le corps du message en majuscules. Le message est écrit sous la méthode process_message (...) :
import boto3 dynamodb_client = boto3.client('dynamodb') DDB_TABLE = 'ProcessedRecords' # Validates if the message is already processed in previous invokes. # @input string message_id # # @param message_id used to query the message from DynamoDB # @return Boolean def is_duplicate_message(message_id): return dynamodb_client.query( TableName='ProcessedRecords', Select='COUNT', KeyConditionExpression='Records = :Records', ExpressionAttributeValues={ ':Records': {'S': message_id} } )["Count"] != 0 # Processes the message body to upper case. # @input string body # # @param body to be processed # @return uppercase body def process_message(body): return body.upper() # Put the message to the DynamoDB Table. # @input string batch_item_success # # @param batch_item_success of the message to put. # @return Boolean def push_to_dynamoDB(batch_item_success): for message_id in batch_item_success: response = dynamodb_client.put_item( TableName = DDB_TABLE, Item={ 'Records': {'S':message_id} } ) return True # processor function iterating through messages in the event. # @input dict Records # # @param Records to be processed # @return Boolean def iterate_records(Records): batch_item_failures = [] batch_item_success = [] for record in Records: message_id = record["messageId"] print("Message: " + message_id) if is_duplicate_message(message_id): print("Message duplicate: " + message_id) continue try: process_message(record["body"]) batch_item_success.append(message_id) except: batch_item_failures.append({"itemIdentifier": message_id}) push_to_dynamoDB(batch_item_success) return batch_item_failures def lambda_handler(event, context): return iterate_records(event["Records"])
Informations connexes
Contenus pertinents
- demandé il y a 2 moislg...
- demandé il y a 2 anslg...
- Réponse acceptéedemandé il y a un anlg...
- AWS OFFICIELA mis à jour il y a un an
- AWS OFFICIELA mis à jour il y a 3 ans
- AWS OFFICIELA mis à jour il y a 3 ans
- AWS OFFICIELA mis à jour il y a 2 ans