my task use case is
- Create sqs queue to process excels uploads + dlq
- Create producer and consumer lambda functions
- Add s3 upload event trigger to producer lambda
- Add sqs worker lambda to consumer lambda
for that i have followed below link
s3event triiger tutorial
don’t create anything manually from the aws console, every feature must be go through serverless framework
Have a look at the existing producer and lambda workflows in the serverless repository, it has all these features except s3 event triggers
for that i created aws c# template project and write cloudformation template(serverless.yml) as below
when i deploy project using sls deploy --stage dev
it throws
Received response status [FAILED] from custom resource. Message returned: Configuration is ambiguously defined. Cannot have overlapping suffixes in two rules if the prefixes are overlapping for the same event type
Error occur if i add dealPriceExcelUploadProducerTrigger function
there is no any issues on creating resources
- please help me to sort out this issue
- how can i read queue URL and db context connection string values to DealPriceExcelUploadProducer.cs from appsetting or env using c#
service: deal-price-excel-upload
frameworkVersion: '3'
custom:
region: eu-west-2
bucketName: ag-staging-buckets
# region: ${env:AWS_DEFAULT_REGION} TODO
provider:
name: aws
runtime: dotnet6
stage: ${sls:stage}
region: ${self:custom.region}
package:
individually: true
patterns:
- '**/*'
functions:
dealPriceExcelUploadProducerTrigger:
handler: src/Handler/DealPrices/DealPriceExcelUploadProducer.Handler
events:
- s3:
bucket: ${self:custom.bucketName}
event: s3:ObjectCreated:*
rules:
- prefix: uploads/deals/deal-price-excel
existing: true
resources:
Resources:
DealPriceExcelConsumerQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: DealPriceExcelConsumerQueue
RedrivePolicy:
deadLetterTargetArn: !GetAtt DealPriceExcelConsumerDLQ.Arn
maxReceiveCount: 5
UpdateReplacePolicy: Snapshot
DealPriceExcelProducerDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: DealPriceExcelProducerDLQ
UpdateReplacePolicy: Snapshot
DealPriceExcelConsumerDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: DealPriceExcelConsumerDLQ
UpdateReplacePolicy: Snapshot
DealPriceExcelConsumerQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref DealPriceExcelConsumerQueue
- !Ref DealPriceExcelConsumerDLQ
- !Ref DealPriceExcelProducerDLQ
PolicyDocument:
Statement:
- Effect: Allow
Action:
# - 'sqs:*'
- 'sqs:DeleteMessage'
- 'sqs:GetQueueAttributes'
- 'sqs:ReceiveMessage'
- 'sqs:SendMessage'
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:sqs:${self:custom.region}:${AWS::AccountId}:DealPriceExcelConsumerQueue'
- Effect: Allow
Action:
# - 'sqs:*'
- 'sqs:DeleteMessage'
- 'sqs:GetQueueAttributes'
- 'sqs:ReceiveMessage'
- 'sqs:SendMessage'
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:sqs:${self:custom.region}:${AWS::AccountId}:DealPriceExcelUploadConsumersDlq'
- Effect: Allow
Action:
# - 'sqs:*'
- 'sqs:DeleteMessage'
- 'sqs:GetQueueAttributes'
- 'sqs:ReceiveMessage'
- 'sqs:SendMessage'
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:sqs:${self:custom.region}:${AWS::AccountId}:DealPriceExcelProducerDLQ'
DealPriceExcelEventTriggerPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: DealPriceExcelEventTriggerPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:PutLogEvents
- logs:CreateLogGroup
- logs:CreateLogStream
Resource: 'arn:aws:logs:*:*:*'
- Effect: Allow
Action:
- s3:GetObject
Resource: 'arn:aws:s3:::${self:custom.bucketName}/*'
Roles:
- !Ref DealPriceExcelExecutionRole
DealPriceExcelExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- sqs.amazonaws.com
- s3.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: DealPriceExcelEventTriggerPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:PutLogEvents
- logs:CreateLogGroup
- logs:CreateLogStream
Resource: 'arn:aws:logs:*:*:*'
- Effect: Allow
Action:
- s3:GetObject
Resource: 'arn:aws:s3:::${self:custom.bucketName}/*'
- PolicyName: SQSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- lambda:CreateEventSourceMapping
- lambda:ListEventSourceMappings
- lambda:ListFunctions
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
- sqs:SendMessage
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
DealPriceExcelUploadProducerFunction:
Type: AWS::Lambda::Function
Properties:
Role: !GetAtt DealPriceExcelExecutionRole.Arn
Runtime: nodejs18.x
FunctionName: DealPriceExcelUploadProducerFunction
Handler: index.handler
DeadLetterConfig:
TargetArn: !GetAtt DealPriceExcelProducerDLQ.Arn
Environment:
Variables:
producerQueueUrl: !Ref DealPriceExcelConsumerQueue
producerDlQueueUrl: !Ref DealPriceExcelProducerDLQ
Code:
ZipFile: |
import { S3Client } from "@aws-sdk/client-s3";
import { SQS } from "@aws-sdk/client-sqs";
const region = process.env.AWS_REGION ??"eu-west-2";
const s3 = new S3Client({ region: region });
const sqs = new SQS({ apiVersion: "2012-11-05",region:region });
const producerQueueUrl = process.env.producerQueueUrl;
const producerDlQueueUrl = process.env.producerDlQueueUrl;
export const handler = async (event): Promise<string | undefined> => {
for (const record of event.Records) {
let retryCount = 0;
const maxRetries = 5;
while (retryCount <= maxRetries) {
try {
await ProcessEvent(record,producerQueueUrl);
retryCount = 0;
return event;
} catch (err) {
if (retryCount == maxRetries) {
AddToDlq(record);
return event;
}
retryCount++;
}
}
};
return event;
};
const ProcessEvent = async (
record,
queueUrl:string
) => {
const bucket = record.s3.bucket.name;
const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, " "));
const payload = {
Bucket: bucket,
Key: key,
};
const messageBody = JSON.stringify(payload);
await sqs.sendMessage({
MessageBody: messageBody,
QueueUrl: queueUrl
});
};
const AddToDlq= async( record)=>{
await ProcessEvent(record, producerDlQueueUrl);
};
Outputs:
DealPriceExcelConsumerQueueURL:
Description: URL of DealPriceExcelConsumer Queue
Value: !GetAtt DealPriceExcelConsumerQueue.QueueUrl
DealPriceExcelConsumerQueueARN:
Description: ARN of DealPriceExcelConsumer Queue
Value: !GetAtt DealPriceExcelConsumerQueue.Arn
DealPriceExcelUploadConsumerDlqURL:
Description: URL of dead-letter queue
Value: !Ref DealPriceExcelConsumerDLQ
DealPriceExcelUploadConsumerDlqARN:
Description: ARN of DealPriceExcelUploadConsumer Dlq
Value: !GetAtt DealPriceExcelConsumerDLQ.Arn
Handler function is
using Amazon.Lambda.S3Events;
using TechneTravel.src.Entity.Dto;
using Amazon.SQS;
using Amazon.SQS.Model;
using Newtonsoft.Json;
using Amazon.Lambda.SQSEvents;
namespace TechneTravel.src.Handler.DealPrices;
public class DealPriceExcelUploadProducer
{
private readonly AmazonSQSClient _sqsClient;
public DealPriceExcelUploadProducer(AmazonSQSClient sqsClient)
{
_sqsClient = sqsClient;
}
public async Task Handler(S3Event evt, ILambdaContext context)
{
await Task.WhenAll(
evt.Records.Select(async record =>
{
await Task.Run(async () =>
{
int retryCount = 0;
int maxRetries = 5;
while (retryCount <= maxRetries)
{
try
{
// TODO add suitable env access
// DealPriceExcelProducerqueue
var queue = $"DEAL_PRICE_EXCEL_QUEUE_URL";
await ProcessEvent(record, queue, context);
retryCount = 0;
return;
}
catch (Exception exception)
{
context.Logger.LogLine($"Exception thrown in deal price ProcessExcelFile {exception.Message}");
if (retryCount == maxRetries)
{
var dlqueue = $"DEAL_PRICE_EXCEL_QUEUE_URL";
await AddToDlq(record, dlqueue, context);
return;
}
retryCount++;
}
}
});
})
);
}
async Task ProcessEvent(S3Event.S3EventNotificationRecord record, string queueUrl, ILambdaContext context)
{
var bucketName = record.S3.Bucket.Name;
var objectKey = record.S3.Object.Key;
var payload = new DealPriceSqsQueuePayload
{
BucketName = bucketName,
Key = objectKey
};
var messageBody = JsonConvert.SerializeObject(payload);
LogInformation(context, $"{messageBody}");
await SendMessage(queueUrl, messageBody, context);
}
private async Task SendMessage(
string qUrl, string messageBody, ILambdaContext context)
{
SendMessageResponse responseSendMsg =
await _sqsClient.SendMessageAsync(qUrl, messageBody);
LogInformation(context, $"Message added to queue\n {qUrl}");
LogInformation(context, $"HttpStatusCode: {responseSendMsg.HttpStatusCode}");
}
private async Task AddToDlq(S3Event.S3EventNotificationRecord record, string queueUrl, ILambdaContext context)
{
await ProcessEvent(record, queueUrl, context);
}
public async Task HandleDealPriceExcelProducerDLQ(SQSEvent evt, ILambdaContext context)
{
await Task.WhenAll(
evt.Records.Select(async record =>
{
int retryCount = 0;
int maxRetries = 5;
while (retryCount <= maxRetries)
{
try
{
// TODO add suitable env access
// DealPriceExcelProducerqueue
var queue = $"DEAL_PRICE_EXCEL_PRODUCERDlQ_URL";
await ProcessDlqEvent(record, queue, context);
await DeleteMessage(queue, record.ReceiptHandle, context);
retryCount = 0;
return;
}
catch (Exception exception)
{
context.Logger.LogLine($"Exception thrown in deal price ProcessExcelFile {exception.Message}");
if (retryCount == maxRetries)
{
await HandleDealPriceExcelProducerDLQ(evt, context);
return;
}
retryCount++;
}
}
})
);
}
async Task ProcessDlqEvent(SQSEvent.SQSMessage message, string queueUrl, ILambdaContext context)
{
LogInformation(context, $"Processed dealPriceExcelUploadProduce dlq message {message.Body}");
DealPriceSqsQueuePayload payload = JsonConvert.DeserializeObject<DealPriceSqsQueuePayload>(message.Body);
var messageBody = JsonConvert.SerializeObject(payload);
LogInformation(context, $"{messageBody}");
await SendMessage(queueUrl, messageBody, context);
}
private async Task DeleteMessage(
string qUrl, string receiptHandle, ILambdaContext context)
{
var responseSendMsg =
await _sqsClient.DeleteMessageAsync(qUrl, receiptHandle, default);
LogInformation(context, $"Message deleted to queue\n {qUrl} {receiptHandle}");
LogInformation(context, $"HttpStatusCode: {responseSendMsg.HttpStatusCode}");
}
private void LogInformation(ILambdaContext context, string message)
{
context.Logger.LogInformation($"{message}");
}
}