Create table in GlueCatalog with Parquet format fails, even when enableUpdateCatalog

0

I am trying to modify a PySpark/Glue job to write a DynamicFrame to a new table in Glue Catalog, with a Parquet format.

The database exists, the table does not yet. I am following the instructions posted here: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html

I am getting the error:

File "/tmp/resource_logs_with_attributes.py", line 443, in write_transformed_data_to_s3
    format="glueparquet"
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 386, in write_dynamic_frame_from_catalog
    makeOptions(self._sc, additional_options), catalog_id)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o91.getCatalogSink.
: com.amazonaws.services.glue.model.EntityNotFoundException: Table api_logs_core not found. (Service: AWSGlue; Status Code: 400; Error Code: EntityNotFoundException; Request ID: 4ed5f8f4-e8bf-4764-afb8-425804d3bcd5; Proxy: null)
        sink = glueContext.getSink(
            connection_type="s3",
            path="s3://mybucket/test",
            enableUpdateCatalog=True,
            format="glueparquet"
        )
        sink.setCatalogInfo(
            catalogDatabase='myexistinggluedatabase',
            catalogTableName='newtable'
        )
        newdyf = glueContext.write_dynamic_frame_from_catalog(
            frame=my_dyf,
            database='myexistinggluedatabase',
            table_name='newtable',
            format="glueparquet"
        )

I have tried variants including:

  • include the partition key array in the sink (partitionKeys=["dt_pk"])
  • format="parquet" + format_options={"useGlueParquetWriter": True}
  • including "updateBehavior": "UPDATE_IN_DATABASE" in additional_options
  • sink.writeFrame() with options as documented
  • my_dyf.write() with connection_options and format + format_options

This is generally functional code that has worked like this:

  • reads data from a source into a DynamicFrame
  • does basic transformations to a new schema
  • works in the context of a Glue Catalog
    • existing code uses write_dynamic_frame.with_options, specifying S3 location and format parquet, partitioned by date, etc., and to a table whose schema was discovered and written to the Glue Catalog by a Glue Crawler. This works in two phases: Glue Job runs and writes Parquet files to S3, then Glue Crawler updates the Glue Catalog
  • writes to a partitioned S3 data set in parquet format
  • Works either with standard parquet, or Glue Parquet Writer

My sole change is that I have the same schema, partitions, data format as before, but that I now want to write to a new S3 location, and that I want to have Glue Job create the table and update the Data Catalog.

posta un anno fa131 visualizzazioni
Nessuna risposta

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande