Intermittent File Not Found error in MWAA

0

I'm running a DAG every minute that gets exchange rates from an API, puts them in a CSV and uploads them to an S3 bucket.

However, intermittently I get this error: FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'

The strange thing is that this is intermittent and usually it works but sometimes doesn't. What could the issue be? Also if you clear the task it will sometimes work.

DAG code:

def s3_file_upload(ti):
    file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
    file_name = os.path.basename(file_path)
    source_session = boto3.Session(
        aws_access_key_id=Variable.get('access_key_id'),
        aws_secret_access_key=Variable.get('secret_access_key')
    )

    source_s3 = source_session.resource('s3')

    s3_bucket = Variable.get('s3_bucket')
    source_bucket = source_s3.Bucket(s3_bucket)
    response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
    print(response) 
1 Risposta
0

When you have mentioned the issue is intermittent, you might be aware that tasks can run randomly on any workers in MWAA, which means task1 can run on worker1 and next task task2 might run on worker2 and this is random as of now. Looking into the path/usr/local/airflow/exchange/exchange_rates_20240418141900.csv, you are trying to store/access/process them locally on the worker node which is not recommended. You should consider using some centralized storage like s3 which is accessible from any nodes.

The idea is you should build the DAG/logic in such a way that it could be run on any workers regardless of the previous task ran on the same worker or not.

AWS
con risposta 5 giorni fa

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande