Boto3 Adding Triggers and Jobs to Glue workflow

0

In Python programmatic way I need to add jobs and triggers to glue workflow. I am not sure how to do it can anyone help.

I have tried with boto3 library I can create workflow , start and stop but unable to assign jobs or triggers .

From below link I have seen it is possible via event bridge but I don't have IAM Access or to create any policies or roles https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html

From below link I have seen there is a code snippet not sure where and how to execute and what service need to be used https://repost.aws/questions/QU8oIj15VUTCS7d38ocHq7nQ/how-to-create-a-glue-workflow-programmatically

I need some working step wise example with code snippet

gefragt vor einem Jahr320 Aufrufe
1 Antwort
2
Akzeptierte Antwort

To answer your question, you need to use create_trigger to add the jobs to the workflow.

For my project, I have created workflows and added jobs and crawlers with parameters using boto3. I will share the snippets of the code below for your reference.

I passed a dict with names of job names and parameters.

{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}

When I passed this dict, the below code will create workflow with jobs and crawlers:


#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
        #Create workflow
        for wrkflw_list in metadata_dict:
            file = wrkflw_list["file_name"].replace('.','_')
            time.sleep(0.4)
            try:
                glue.create_workflow(
                    Name = file,
                    Description = file + " Glue workflow - landing to raw to curated",
                    MaxConcurrentRuns = 1,
                    Tags=tag_list
                    )
            except glue.exceptions.AlreadyExistsException:
                print(file, "- Already exists")
            except:
                raise "error -- check the code"
                            
        #Create start trigger and add to workflow
        # for wrkflw_list in metadata_dict:
            try:
                trigger_nm=file + "_start"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type="ON_DEMAND",
                    Actions=[
                        {'CrawlerName': file + "_landing_crawler"}
                        ],
                    Tags=tag_list
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_landing_crawler_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'CrawlerName': file + "_landing_crawler",
                                    'CrawlState': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            "JobName":wrkflw_list["landing_to_raw_job"],
                            "Arguments": 
                                    {"--run_type":"NORMAL",
                                    "--file_name":wrkflw_list["file_name"],
                                    "--config_path":"file_ingestion_conf_DEV.json"
                                    }
                            }
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_landing2raw_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'JobName': wrkflw_list["landing_to_raw_job"],
                                    'State': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {'CrawlerName': file + "_raw_crawler"}
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_raw_crawler_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'CrawlerName': file + "_raw_crawler",
                                    'CrawlState': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            "JobName":wrkflw_list["raw_to_curated_job"],
                            "Arguments": 
                                    {"--run_type":"NORMAL",
                                    "--file_name":wrkflw_list["file_name"],
                                    "--config_path":"file_ingestion_conf_DEV.json"
                                    }
                            }
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_raw2curated_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    "JobName":wrkflw_list["raw_to_curated_job"],
                                    'State': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            'CrawlerName': file + "_curated_crawler"
                            }
                        ]
                    )
                time.sleep(0.2)
profile pictureAWS
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen