I can't use governed tables with Glue

0

I'm trying to use lake formation governed tables but have encountered some issues when I try to use them with Glue. I followed these steps:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

datasink0 = glueContext.write_dynamic_frame.from_catalog(
    frame = datasource0, 
    database=output_db, 
    table_name=output_tbl, 
    additional_options={
        "useGlueParquetWriter":True,
        "transactionId":tx_id, 
        "partitionKeys":["year_month"],
        "callDeleteObjectsOnCancel":"true"
    }
)
#glueContext.purge_table(output_db, output_tbl)
#wr.lakeformation.commit_transaction(tx_id)
is_commited = glueContext.commit_transaction(tx_id)
logger.info(f'Commit status: {is_commited}')
job. Commit()
  • The job runs successfully, I saw that the commit result was True, but when I do a query in athena, the table is empty Athena query A second script I found was this one
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

dest_path = "s3://bbs-data-lake/curated_zone/software_review/"

sink = glueContext.getSink(
    connection_type="s3", path=dest_path,
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=tx_id,
	additional_options={
	        "useGlueParquetWriter":True,
	        "partitionKeys":["year_month"],
            "callDeleteObjectsOnCancel":"true"
        }
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
    catalogDatabase=output_db, catalogTableName=output_tbl
)

try:
    sink.writeFrame(datasource0)
    glueContext.commit_transaction(tx_id)
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
job. Commit()

in this case the job does not run and returns the following error related to writing to s3 Script error I tried with different configurations, like changing the glue version, using the useGlueParquetWriter parameter, but without success. Remembering that, if I update the tables governed by the AWS data wrangler, it works perfectly, but I cannot use spark, only pandas in this case.

  • The stack trace in the logs should give more information about the cause of that InvocationTargetException.

preguntada hace un año88 visualizaciones
No hay respuestas

No has iniciado sesión. Iniciar sesión para publicar una respuesta.

Una buena respuesta responde claramente a la pregunta, proporciona comentarios constructivos y fomenta el crecimiento profesional en la persona que hace la pregunta.

Pautas para responder preguntas