Airflow EMR Hook failing while requesting to add a step

0

Hope everyone is doing well!

Here's the context of the issue I'm facing, I'm working on a company that is supporting a really old airflow version, here are the details of the version and some components.

airflow=1.10.1=py36_0 python=3.6.2=0 botocore=1.12.226=py_0 awscli=1.16.236=py36_0 boto3=1.9.199=py_0 boto=2.49.0=py36_0 Since a couple of days ago, we have been facing an issue on a DAG that is supposed to have part of the code to add a task to an EMR cluster and we are facing the following issue:

Traceback (most recent call last):
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/src/src/dags/data_conversion/operators/emr.py", line 69, in execute
    response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the AddJobFlowSteps operation: A job flow that is shutting down, terminated, or finished may not be modified.

And here is the part of the code affected:

        job_flow_id = context['task_instance'].xcom_pull(task_ids=self.cluster_creator_operator_name)[0]

        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()

        logging.info('Adding steps to %s', job_flow_id)
        response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)

        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise AirflowException('Adding steps failed: %s' % response)
        else:
            logging.info('Steps %s added to JobFlow', response['StepIds'])
            return response['StepIds']

Based on my research, I've found a stackoverflow post (https://stackoverflow.com/questions/64634755/mrjob-im-having-a-client-error-while-using-emr) where it is mentioned that botocore package is deprecated as well as in the github it is mentioned that is no longer support the version of python we are using.

Would this be the correct analysis for the issue? I've also found this link (https://stackoverflow.com/questions/65595398/mrjob-in-emr-is-running-only-1-mrstep-out-of-3-mrsteps-and-cluster-is-shutting-d) where it is suggested to persist an EMR cluster but not sure if will be useful because I suspect the Airflow classes are invoking botocore package and I would be unable to override that.

Thanks a lot.

已提問 1 年前檢視次數 73 次
1 個回答
0

It could be due to botocore, but ValidationException occurs when the issue is with the API call itself which is not expected as per the endpoint serving the request.

Generally, many of the Airflow users used to use the AWS Airflow operators airflow.providers.amazon.aws.operators.emr instead of calling boto3/botocore, you can consider to evaluate this.

AWS
已回答 1 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南