I'm trying to use lake formation governed tables but have encountered some issues when I try to use them with Glue. I followed these steps:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)
# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database=db,
table_name=tbl,
transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')
datasink0 = glueContext.write_dynamic_frame.from_catalog(
frame = datasource0,
database=output_db,
table_name=output_tbl,
additional_options={
"useGlueParquetWriter":True,
"transactionId":tx_id,
"partitionKeys":["year_month"],
"callDeleteObjectsOnCancel":"true"
}
)
#glueContext.purge_table(output_db, output_tbl)
#wr.lakeformation.commit_transaction(tx_id)
is_commited = glueContext.commit_transaction(tx_id)
logger.info(f'Commit status: {is_commited}')
job. Commit()
- The job runs successfully, I saw that the commit result was True, but when I do a query in athena, the table is empty
A second script I found was this one
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)
# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database=db,
table_name=tbl,
transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')
dest_path = "s3://bbs-data-lake/curated_zone/software_review/"
sink = glueContext.getSink(
connection_type="s3", path=dest_path,
enableUpdateCatalog=True,
updateBehavior="UPDATE_IN_DATABASE",
transactionId=tx_id,
additional_options={
"useGlueParquetWriter":True,
"partitionKeys":["year_month"],
"callDeleteObjectsOnCancel":"true"
}
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
catalogDatabase=output_db, catalogTableName=output_tbl
)
try:
sink.writeFrame(datasource0)
glueContext.commit_transaction(tx_id)
except Exception:
glueContext.cancel_transaction(tx_id)
raise
job. Commit()
in this case the job does not run and returns the following error related to writing to s3
I tried with different configurations, like changing the glue version, using the useGlueParquetWriter parameter, but without success.
Remembering that, if I update the tables governed by the AWS data wrangler, it works perfectly, but I cannot use spark, only pandas in this case.
The stack trace in the logs should give more information about the cause of that InvocationTargetException.