Issue with MWAA Airflow

0

In Airflow, we have a series of tasks. But to simplify let us consider there are 2 tasks. The first task lists files from an S3 bucket and the second task picks those files to process.

list_files_task = list_files()
transform_files_task = transform_file.expand(source_s3_key=list_files_task)
list_files_task >> transform_files_task

Very rarely (once in a week or so, it is running at 5 min cadence) what is happening is that the first task completes in a second and the second task gets picked like 12 hours later. What could be the reason? Our tasks are not super complicated and nothing seem off about our code.

gefragt vor 6 Monaten85 Aufrufe
1 Antwort
0

Hello,

Are you making use of Airflow xcomm in airflow ? If you use xcomms I would imagine this shouldn't happen.

The behavior that you have mentioned this is related with SQS visibility timeout which is 12 hours.

As you probably know that ApproximateAgeOfOldestMessage is the age of the oldest Task on the SQS Queue used by your environment. Basically, it means there are Tasks on the SQS Queue. When a Task is scheduled, the Executor will send the Task to the SQS Queue. When a worker picks up the Task, it will mark it as invisible until it finishes executing the Task and removes it from the SQS Queue. If the Task fails on the Worker or the Worker itself shuts down before the Task finishes, the Task won’t be cleared from the SQS Queue. There might be ‘WorkerLostError’ or Worker related errors in the logs.

Next steps:

  • Consider checking the scheduler and worker logs to see what is the reason why the task was not picked up and been stuck/waiting.
  • Consider checking the cloudwatch metrics - ApproximateAgeOfOldestMessage when the issue happens.
  • It might also be due to edge case when the worker was marked for autoscaling ( downscale of workers in the environment ) and the previous task executed on that node.
AWS
beantwortet vor 10 Tagen

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen