I can't use governed tables with Glue

0

I'm trying to use lake formation governed tables but have encountered some issues when I try to use them with Glue. I followed these steps:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

datasink0 = glueContext.write_dynamic_frame.from_catalog(
    frame = datasource0, 
    database=output_db, 
    table_name=output_tbl, 
    additional_options={
        "useGlueParquetWriter":True,
        "transactionId":tx_id, 
        "partitionKeys":["year_month"],
        "callDeleteObjectsOnCancel":"true"
    }
)
#glueContext.purge_table(output_db, output_tbl)
#wr.lakeformation.commit_transaction(tx_id)
is_commited = glueContext.commit_transaction(tx_id)
logger.info(f'Commit status: {is_commited}')
job. Commit()
  • The job runs successfully, I saw that the commit result was True, but when I do a query in athena, the table is empty Athena query A second script I found was this one
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

db = "db_source_zone"
tbl = "software_review"
output_db = "db_curated_zone"
output_tbl = "software_review_gov"
tx_id = glueContext.start_transaction(False)

# Script generated for node AWS Glue Data Catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database=db,
    table_name=tbl,
    transformation_ctx="datasource0",
)
nrows = datasource0.toDF().count()
logger.info(f'Number of rows: {nrows}')

dest_path = "s3://bbs-data-lake/curated_zone/software_review/"

sink = glueContext.getSink(
    connection_type="s3", path=dest_path,
    enableUpdateCatalog=True,
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=tx_id,
	additional_options={
	        "useGlueParquetWriter":True,
	        "partitionKeys":["year_month"],
            "callDeleteObjectsOnCancel":"true"
        }
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
    catalogDatabase=output_db, catalogTableName=output_tbl
)

try:
    sink.writeFrame(datasource0)
    glueContext.commit_transaction(tx_id)
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
job. Commit()

in this case the job does not run and returns the following error related to writing to s3 Script error I tried with different configurations, like changing the glue version, using the useGlueParquetWriter parameter, but without success. Remembering that, if I update the tables governed by the AWS data wrangler, it works perfectly, but I cannot use spark, only pandas in this case.

  • The stack trace in the logs should give more information about the cause of that InvocationTargetException.

feita há um ano88 visualizações
Sem respostas

Você não está conectado. Fazer login para postar uma resposta.

Uma boa resposta responde claramente à pergunta, dá feedback construtivo e incentiva o crescimento profissional de quem perguntou.

Diretrizes para responder a perguntas