Issue with MWAA Airflow

0

In Airflow, we have a series of tasks. But to simplify let us consider there are 2 tasks. The first task lists files from an S3 bucket and the second task picks those files to process.

list_files_task = list_files()
transform_files_task = transform_file.expand(source_s3_key=list_files_task)
list_files_task >> transform_files_task

Very rarely (once in a week or so, it is running at 5 min cadence) what is happening is that the first task completes in a second and the second task gets picked like 12 hours later. What could be the reason? Our tasks are not super complicated and nothing seem off about our code.

已提問 6 個月前檢視次數 89 次
1 個回答
0

Hello,

Are you making use of Airflow xcomm in airflow ? If you use xcomms I would imagine this shouldn't happen.

The behavior that you have mentioned this is related with SQS visibility timeout which is 12 hours.

As you probably know that ApproximateAgeOfOldestMessage is the age of the oldest Task on the SQS Queue used by your environment. Basically, it means there are Tasks on the SQS Queue. When a Task is scheduled, the Executor will send the Task to the SQS Queue. When a worker picks up the Task, it will mark it as invisible until it finishes executing the Task and removes it from the SQS Queue. If the Task fails on the Worker or the Worker itself shuts down before the Task finishes, the Task won’t be cleared from the SQS Queue. There might be ‘WorkerLostError’ or Worker related errors in the logs.

Next steps:

  • Consider checking the scheduler and worker logs to see what is the reason why the task was not picked up and been stuck/waiting.
  • Consider checking the cloudwatch metrics - ApproximateAgeOfOldestMessage when the issue happens.
  • It might also be due to edge case when the worker was marked for autoscaling ( downscale of workers in the environment ) and the previous task executed on that node.
AWS
已回答 17 天前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南