Intermittent File Not Found error in MWAA

0

I'm running a DAG every minute that gets exchange rates from an API, puts them in a CSV and uploads them to an S3 bucket.

However, intermittently I get this error: FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'

The strange thing is that this is intermittent and usually it works but sometimes doesn't. What could the issue be? Also if you clear the task it will sometimes work.

DAG code:

def s3_file_upload(ti):
    file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
    file_name = os.path.basename(file_path)
    source_session = boto3.Session(
        aws_access_key_id=Variable.get('access_key_id'),
        aws_secret_access_key=Variable.get('secret_access_key')
    )

    source_s3 = source_session.resource('s3')

    s3_bucket = Variable.get('s3_bucket')
    source_bucket = source_s3.Bucket(s3_bucket)
    response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
    print(response) 
1 個回答
0

When you have mentioned the issue is intermittent, you might be aware that tasks can run randomly on any workers in MWAA, which means task1 can run on worker1 and next task task2 might run on worker2 and this is random as of now. Looking into the path/usr/local/airflow/exchange/exchange_rates_20240418141900.csv, you are trying to store/access/process them locally on the worker node which is not recommended. You should consider using some centralized storage like s3 which is accessible from any nodes.

The idea is you should build the DAG/logic in such a way that it could be run on any workers regardless of the previous task ran on the same worker or not.

AWS
已回答 23 天前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南