Create table in GlueCatalog with Parquet format fails, even when enableUpdateCatalog

0

I am trying to modify a PySpark/Glue job to write a DynamicFrame to a new table in Glue Catalog, with a Parquet format.

The database exists, the table does not yet. I am following the instructions posted here: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html

I am getting the error:

File "/tmp/resource_logs_with_attributes.py", line 443, in write_transformed_data_to_s3
    format="glueparquet"
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 386, in write_dynamic_frame_from_catalog
    makeOptions(self._sc, additional_options), catalog_id)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o91.getCatalogSink.
: com.amazonaws.services.glue.model.EntityNotFoundException: Table api_logs_core not found. (Service: AWSGlue; Status Code: 400; Error Code: EntityNotFoundException; Request ID: 4ed5f8f4-e8bf-4764-afb8-425804d3bcd5; Proxy: null)
        sink = glueContext.getSink(
            connection_type="s3",
            path="s3://mybucket/test",
            enableUpdateCatalog=True,
            format="glueparquet"
        )
        sink.setCatalogInfo(
            catalogDatabase='myexistinggluedatabase',
            catalogTableName='newtable'
        )
        newdyf = glueContext.write_dynamic_frame_from_catalog(
            frame=my_dyf,
            database='myexistinggluedatabase',
            table_name='newtable',
            format="glueparquet"
        )

I have tried variants including:

  • include the partition key array in the sink (partitionKeys=["dt_pk"])
  • format="parquet" + format_options={"useGlueParquetWriter": True}
  • including "updateBehavior": "UPDATE_IN_DATABASE" in additional_options
  • sink.writeFrame() with options as documented
  • my_dyf.write() with connection_options and format + format_options

This is generally functional code that has worked like this:

  • reads data from a source into a DynamicFrame
  • does basic transformations to a new schema
  • works in the context of a Glue Catalog
    • existing code uses write_dynamic_frame.with_options, specifying S3 location and format parquet, partitioned by date, etc., and to a table whose schema was discovered and written to the Glue Catalog by a Glue Crawler. This works in two phases: Glue Job runs and writes Parquet files to S3, then Glue Crawler updates the Glue Catalog
  • writes to a partitioned S3 data set in parquet format
  • Works either with standard parquet, or Glue Parquet Writer

My sole change is that I have the same schema, partitions, data format as before, but that I now want to write to a new S3 location, and that I want to have Glue Job create the table and update the Data Catalog.

feita há um ano131 visualizações
Sem respostas

Você não está conectado. Fazer login para postar uma resposta.

Uma boa resposta responde claramente à pergunta, dá feedback construtivo e incentiva o crescimento profissional de quem perguntou.

Diretrizes para responder a perguntas