I'm running a DAG every minute that gets exchange rates from an API, puts them in a CSV and uploads them to an S3 bucket.
However, intermittently I get this error: FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'
The strange thing is that this is intermittent and usually it works but sometimes doesn't. What could the issue be? Also if you clear the task it will sometimes work.
DAG code:
def s3_file_upload(ti):
file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
file_name = os.path.basename(file_path)
source_session = boto3.Session(
aws_access_key_id=Variable.get('access_key_id'),
aws_secret_access_key=Variable.get('secret_access_key')
)
source_s3 = source_session.resource('s3')
s3_bucket = Variable.get('s3_bucket')
source_bucket = source_s3.Bucket(s3_bucket)
response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
print(response)