Intermittent File Not Found error in MWAA

0

I'm running a DAG every minute that gets exchange rates from an API, puts them in a CSV and uploads them to an S3 bucket.

However, intermittently I get this error: FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'

The strange thing is that this is intermittent and usually it works but sometimes doesn't. What could the issue be? Also if you clear the task it will sometimes work.

DAG code:

def s3_file_upload(ti):
    file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
    file_name = os.path.basename(file_path)
    source_session = boto3.Session(
        aws_access_key_id=Variable.get('access_key_id'),
        aws_secret_access_key=Variable.get('secret_access_key')
    )

    source_s3 = source_session.resource('s3')

    s3_bucket = Variable.get('s3_bucket')
    source_bucket = source_s3.Bucket(s3_bucket)
    response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
    print(response) 
1 回答
0

When you have mentioned the issue is intermittent, you might be aware that tasks can run randomly on any workers in MWAA, which means task1 can run on worker1 and next task task2 might run on worker2 and this is random as of now. Looking into the path/usr/local/airflow/exchange/exchange_rates_20240418141900.csv, you are trying to store/access/process them locally on the worker node which is not recommended. You should consider using some centralized storage like s3 which is accessible from any nodes.

The idea is you should build the DAG/logic in such a way that it could be run on any workers regardless of the previous task ran on the same worker or not.

AWS
已回答 23 天前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则

相关内容