Intermittent File Not Found error in MWAA

0

I'm running a DAG every minute that gets exchange rates from an API, puts them in a CSV and uploads them to an S3 bucket.

However, intermittently I get this error: FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'

The strange thing is that this is intermittent and usually it works but sometimes doesn't. What could the issue be? Also if you clear the task it will sometimes work.

DAG code:

def s3_file_upload(ti):
    file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
    file_name = os.path.basename(file_path)
    source_session = boto3.Session(
        aws_access_key_id=Variable.get('access_key_id'),
        aws_secret_access_key=Variable.get('secret_access_key')
    )

    source_s3 = source_session.resource('s3')

    s3_bucket = Variable.get('s3_bucket')
    source_bucket = source_s3.Bucket(s3_bucket)
    response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
    print(response) 
1 Answer
0

When you have mentioned the issue is intermittent, you might be aware that tasks can run randomly on any workers in MWAA, which means task1 can run on worker1 and next task task2 might run on worker2 and this is random as of now. Looking into the path/usr/local/airflow/exchange/exchange_rates_20240418141900.csv, you are trying to store/access/process them locally on the worker node which is not recommended. You should consider using some centralized storage like s3 which is accessible from any nodes.

The idea is you should build the DAG/logic in such a way that it could be run on any workers regardless of the previous task ran on the same worker or not.

AWS
answered 3 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions