How to handle when db schema is null on Glue using bookmark

0

Hello

I am using Glue Pyspark to handle ETL, but when I tried running script with bookmark, I found out that if one script handles more than one table and one of them doesn't have changes or updates, the whole script would fail, so, I tried the following but it doesn't work. I wonder what's wrong and if there are any other workarounds? Thanks a million.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import regexp_replace, col, upper, initcap, row_number, udf, concat, date_format, coalesce, when, lit, get_json_object
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType, StringType, IntegerType
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader
from pyspark.sql import DataFrame 

args = getResolvedOptions(
    sys.argv,
    [
        'env',
        'role',		 
        'curated_bucket',
        'dataset_folder',
        'JOB_NAME'       
    ])

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dynamic_frame_reader = DynamicFrameReader(glueContext)

env = args['env']
role = args['role']						   
curated_bucket = args['curated_bucket']
dataset_folder = args['dataset_folder']

########################### Handle role employee ###############################################
# Read from the customers table in the glue data catalog using a dynamic frame
node_role_employee = glueContext.create_dynamic_frame_from_catalog(
    database=role,
    table_name="employee",
    transformation_ctx="node_role_employee",   
    additional_options={"jobBookmarkKeys":["id"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()
node_role_employee.count()
node_role_employee.schema
node_role_employee.head(1)

if node_role_employee.schema() is not null:
# # Filter the DataFrame based on the 'op' column
    w = Window.partitionBy("employeeId").orderBy(node_role_employee["transaction_seq"].desc())
    dyfroleEmployeeRow = node_role_employee.withColumn("rownum", row_number().over(w))
    dyfroleEmployeeFiltered= dyfroleEmployeeRow.filter((dyfroleEmployeeRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
    dyfroleEmployeeDel = dyfroleEmployeeFiltered.filter(col("op") != "D")
    

	dyfJoinEmployeeConfigCoalesce = dyfroleEmployeeDel.coalesce(1)
	# Convert from Spark DataFrame to Glue DynamicFrame
	dyfJoinEmployeeConfigConvert = DynamicFrame.fromDF(dyfJoinEmployeeConfigCoalesce, glueContext, "convert")

	# Write the data in the converted DynamicFrame to an S3 location
	glueContext.write_dynamic_frame.from_options(
		frame=dyfJoinEmployeeConfigConvert,
		connection_type="s3",
		connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark/employee/"},
		format="parquet",
		transformation_ctx="datasink1"
)
####################################################################################################
########################### Handle role isoagent ###############################################

# Read from the customers table in the glue data catalog using a dynamic frame
node_role_agent = glueContext.create_dynamic_frame_from_catalog(
    database=role,
    table_name="isoagent",
    transformation_ctx="node_role_agent",    
    additional_options={"jobBookmarkKeys":["id"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()
if node_role_employee.schema() is not null:

	w = Window.partitionBy("isoAgentId").orderBy(node_role_agent["transaction_seq"].desc())
	dyfroleIsoagentRow = node_role_agent.withColumn("rownum", row_number().over(w))
	dyfroleIsoagentFiltered= dyfroleIsoagentRow.filter((dyfroleIsoagentRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
	dyfroleIsoagentDel = dyfroleIsoagentFiltered.filter(col("op") != "D")


	dyfroleIsoagentCoalesce = dyfroleIsoagentDel.coalesce(1)							   

	# Convert from Spark DataFrame to Glue DynamicFrame
	dyfroleIsoagentConvert= DynamicFrame.fromDF(dyfroleIsoagentCoalesce, glueContext, "convert")

	# Write the data in the converted DynamicFrame to an S3 location
	glueContext.write_dynamic_frame.from_options(
		frame=dyfroleIsoagentConvert,
		connection_type="s3",
		connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark/agent/"},
		format="parquet",

		transformation_ctx="datasink2"
)



job.commit()



asked 2 months ago329 views
2 Answers
0

It seems like you're trying to handle ETL processes using AWS Glue with PySpark. The issue you're facing might be due to trying to access the schema directly on the DataFrame object, which could lead to errors. Here are some suggestions to fix the issue and improve the script:

  1. Check for schema existence: Instead of checking if node_role_employee.schema() is not None, you should check if the DataFrame has data. You can use the isEmpty() method for this purpose.

  2. Handle schema differences: If you're dealing with multiple tables, it's possible that their schemas might differ. Make sure to handle schema differences appropriately.

3)Error Handling: Implement error handling to catch any exceptions that might occur during the ETL process.

Here's the modified version of your script with these suggestions incorporated:

Your previous code...

Handle role employee

node_role_employee = glueContext.create_dynamic_frame_from_catalog( database=role, table_name="employee", transformation_ctx="node_role_employee", additional_options={"jobBookmarkKeys": ["id"], "jobBookmarkKeysSortOrder": "asc", "mergeSchema": "true"} ).toDF()

Check if the DataFrame has data

if not node_role_employee.isEmpty(): # Your DataFrame operations... # For example: w = Window.partitionBy("employeeId").orderBy(node_role_employee["transaction_seq"].desc()) dyfroleEmployeeRow = node_role_employee.withColumn("rownum", row_number().over(w)) dyfroleEmployeeFiltered = dyfroleEmployeeRow.filter((dyfroleEmployeeRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum") dyfroleEmployeeDel = dyfroleEmployeeFiltered.filter(col("op") != "D")

dyfJoinEmployeeConfigCoalesce = dyfroleEmployeeDel.coalesce(1)
dyfJoinEmployeeConfigConvert = DynamicFrame.fromDF(dyfJoinEmployeeConfigCoalesce, glueContext, "convert")

# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
    frame=dyfJoinEmployeeConfigConvert,
    connection_type="s3",
    connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark/employee/"},
    format="parquet",
    transformation_ctx="datasink1"
)

Handle role isoagent

node_role_agent = glueContext.create_dynamic_frame_from_catalog( database=role, table_name="isoagent", transformation_ctx="node_role_agent", additional_options={"jobBookmarkKeys": ["id"], "jobBookmarkKeysSortOrder": "asc", "mergeSchema": "true"} ).toDF()

Check if the DataFrame has data

if not node_role_agent.isEmpty(): # Your DataFrame operations...

Your previous code...

job.commit()

Ensure that you replace the placeholder comments with your actual DataFrame operations. This modification will ensure that the script proceeds to process each table independently without failing if one of them has no data. Additionally, consider implementing proper error handling for robustness.

profile picture
answered 2 months ago
0

Thank you so much for your help. The bookmark works fine now on script handling single table. However, I am also thinking of joining tables. I have a code as below and I have tried to add 'else' to handle the case that, say, table configuration doesn't have updates but table employee has, then, updated employee table would join configuration table with previous bookmarked record, but it fails. I am not sure if AWS bookmark is capable to do so.

node_role_config_3 = glueContext.create_dynamic_frame_from_catalog(
    database=role,
    table_name="configuration",
    transformation_ctx="node_role_config_3",   
    additional_options={"jobBookmarkKeys":["id"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()

if not node_role_config_3.isEmpty():
    w = Window.partitionBy("id").orderBy(node_role_config_3["transaction_seq"].desc())
    dyfroleConfigRow = node_role_config_3.withColumn("rownum", row_number().over(w))
    dyfroleConfigFiltered= dyfroleConfigRow.filter((dyfroleConfigRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
    dyfroleConfigDel = dyfroleConfigFiltered.filter(col("op") != "D")
	
	dyfroleConfigSelect = dyfroleConfigDel.select(
          col("merchant_config_id"))
##################################################################################		  
node_role_employee_3 = glueContext.create_dynamic_frame_from_catalog(
    database=role,
    table_name="employee",
    transformation_ctx="node_role_employee_3",   
    additional_options={"jobBookmarkKeys":["employeeId"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()

if not node_role_employee_3.isEmpty():
    logger.info("sorting employee table begins")
    w = Window.partitionBy("employeeId").orderBy(node_role_employee_3["transaction_seq"].desc())
    dyfroleEmployeeRow = node_role_employee_3.withColumn("rownum", row_number().over(w))
    dyfroleEmployeeFiltered= dyfroleEmployeeRow.filter((dyfroleEmployeeRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
    dyfroleEmployeeDel = dyfroleEmployeeFiltered.filter(col("op") != "D")
######################################################################################	
dyfJoinEmployeeConfig= dyfroleEmployeeSelect.join(
    dyfroleConfigSelect,
    dyfroleEmployeeSelect.employee_id == dyfroleConfigSelect.config_employee_id,
    "left"
).drop(dyfroleConfigSelect.config_employee_id)	

dyfJoinEmployeeConfigCoalesce = dyfJoinEmployeeConfig.coalesce(1)
# Convert from Spark DataFrame to Glue DynamicFrame
dyfJoinEmployeeConfigConvert = DynamicFrame.fromDF(dyfJoinEmployeeConfigCoalesce, glueContext, "convert")

# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
    frame=dyfJoinEmployeeConfigConvert,
    connection_type="s3",
    connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark3/employee_config/"},
    format="parquet",
    transformation_ctx="datasink2"
)

job.commit()
answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions