I am trying to modify a PySpark/Glue job to write a DynamicFrame to a new table in Glue Catalog, with a Parquet format.
The database exists, the table does not yet. I am following the instructions posted here: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html
I am getting the error:
File "/tmp/resource_logs_with_attributes.py", line 443, in write_transformed_data_to_s3
format="glueparquet"
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 386, in write_dynamic_frame_from_catalog
makeOptions(self._sc, additional_options), catalog_id)
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o91.getCatalogSink.
: com.amazonaws.services.glue.model.EntityNotFoundException: Table api_logs_core not found. (Service: AWSGlue; Status Code: 400; Error Code: EntityNotFoundException; Request ID: 4ed5f8f4-e8bf-4764-afb8-425804d3bcd5; Proxy: null)
sink = glueContext.getSink(
connection_type="s3",
path="s3://mybucket/test",
enableUpdateCatalog=True,
format="glueparquet"
)
sink.setCatalogInfo(
catalogDatabase='myexistinggluedatabase',
catalogTableName='newtable'
)
newdyf = glueContext.write_dynamic_frame_from_catalog(
frame=my_dyf,
database='myexistinggluedatabase',
table_name='newtable',
format="glueparquet"
)
I have tried variants including:
- include the partition key array in the sink (
partitionKeys=["dt_pk"]
)
format="parquet"
+ format_options={"useGlueParquetWriter": True}
- including
"updateBehavior": "UPDATE_IN_DATABASE"
in additional_options
sink.writeFrame()
with options as documented
my_dyf.write()
with connection_options and format + format_options
This is generally functional code that has worked like this:
- reads data from a source into a DynamicFrame
- does basic transformations to a new schema
- works in the context of a Glue Catalog
- existing code uses
write_dynamic_frame.with_options
, specifying S3 location and format parquet, partitioned by date, etc., and to a table whose schema was discovered and written to the Glue Catalog by a Glue Crawler. This works in two phases: Glue Job runs and writes Parquet files to S3, then Glue Crawler updates the Glue Catalog
- writes to a partitioned S3 data set in parquet format
- Works either with standard parquet, or Glue Parquet Writer
My sole change is that I have the same schema, partitions, data format as before, but that I now want to write to a new S3 location, and that I want to have Glue Job create the table and update the Data Catalog.