I can't use governed tables with Glue

0

I'm trying to use lake formation governed tables but have encountered some issues when I try to use them with Glue. I followed these steps:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

datasink0 = glueContext.write_dynamic_frame.from_catalog(
    frame = datasource0, 
    database=output_db, 
    table_name=output_tbl, 
    additional_options={
        "useGlueParquetWriter":True,
        "transactionId":tx_id, 
        "partitionKeys":["year_month"],
        "callDeleteObjectsOnCancel":"true"
    }
)
#glueContext.purge_table(output_db, output_tbl)
#wr.lakeformation.commit_transaction(tx_id)
is_commited = glueContext.commit_transaction(tx_id)
logger.info(f'Commit status: {is_commited}')
job. Commit()
  • The job runs successfully, I saw that the commit result was True, but when I do a query in athena, the table is empty Athena query A second script I found was this one
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

dest_path = "s3://bbs-data-lake/curated_zone/software_review/"

sink = glueContext.getSink(
    connection_type="s3", path=dest_path,
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=tx_id,
	additional_options={
	        "useGlueParquetWriter":True,
	        "partitionKeys":["year_month"],
            "callDeleteObjectsOnCancel":"true"
        }
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
    catalogDatabase=output_db, catalogTableName=output_tbl
)

try:
    sink.writeFrame(datasource0)
    glueContext.commit_transaction(tx_id)
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
job. Commit()

in this case the job does not run and returns the following error related to writing to s3 Script error I tried with different configurations, like changing the glue version, using the useGlueParquetWriter parameter, but without success. Remembering that, if I update the tables governed by the AWS data wrangler, it works perfectly, but I cannot use spark, only pandas in this case.

  • The stack trace in the logs should give more information about the cause of that InvocationTargetException.

asked a year ago85 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions