I can't use governed tables with Glue

0

I'm trying to use lake formation governed tables but have encountered some issues when I try to use them with Glue. I followed these steps:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

datasink0 = glueContext.write_dynamic_frame.from_catalog(
    frame = datasource0, 
    database=output_db, 
    table_name=output_tbl, 
    additional_options={
        "useGlueParquetWriter":True,
        "transactionId":tx_id, 
        "partitionKeys":["year_month"],
        "callDeleteObjectsOnCancel":"true"
    }
)
#glueContext.purge_table(output_db, output_tbl)
#wr.lakeformation.commit_transaction(tx_id)
is_commited = glueContext.commit_transaction(tx_id)
logger.info(f'Commit status: {is_commited}')
job. Commit()
  • The job runs successfully, I saw that the commit result was True, but when I do a query in athena, the table is empty Athena query A second script I found was this one
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

dest_path = "s3://bbs-data-lake/curated_zone/software_review/"

sink = glueContext.getSink(
    connection_type="s3", path=dest_path,
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=tx_id,
	additional_options={
	        "useGlueParquetWriter":True,
	        "partitionKeys":["year_month"],
            "callDeleteObjectsOnCancel":"true"
        }
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
    catalogDatabase=output_db, catalogTableName=output_tbl
)

try:
    sink.writeFrame(datasource0)
    glueContext.commit_transaction(tx_id)
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
job. Commit()

in this case the job does not run and returns the following error related to writing to s3 Script error I tried with different configurations, like changing the glue version, using the useGlueParquetWriter parameter, but without success. Remembering that, if I update the tables governed by the AWS data wrangler, it works perfectly, but I cannot use spark, only pandas in this case.

  • The stack trace in the logs should give more information about the cause of that InvocationTargetException.

gefragt vor einem Jahr88 Aufrufe
Keine Antworten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen