Delta Lake write from AWS Glue streaming job

0

Looks like attempting to write to a Delta Lake table from a DynamicFrame is not working. The Visual Glue interface generates a script like:

        s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

that fails with the following error:

An error occurred while calling o183.pyWriteDynamicFrame. : java.lang.ClassNotFoundException: Failed to load format with name delta.

Also a sample listed here https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-delta-lake.html does not work but for a different reason (https://stackoverflow.com/questions/76262252/creating-a-delta-table-in-s3-with-glue-delta-lake-creates-a-glue-catalog-table)

The only way I found to save data in a DeltaLake table is going back using a DataFrame writer not attempting to register the table in the Glue catalogue:

 additional_options = {
            "path": "s3://....",
            "write.parquet.compression-codec": "snappy",
        }
        df = MyDinamicFrame.toDF()
        df.write.format("delta").options(**additional_options).mode("append").save()

Am I missing something? I am using Glue 4.0 with the default DeltaLake version.

profile picture
已提問 10 個月前檢視次數 471 次
2 個答案
1
已接受的答案

That sounds like a detect, when using the same Delta sink on a regular non streaming job, the code looks like the second one you describe. Will open a ticket to the streaming team.

profile pictureAWS
專家
已回答 10 個月前
  • I've run into a very similar error trying to write to an iceberg table from a Glue streaming job.

0

No, it is correct. In the same Visual Job add a parent data node (Kafka in my case). Once this is in place you will see that the script changes to something like:

s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

I made the same test you did and is how discovered that this version works:

S3bucket_node3_df.write.format("delta").options(**additional_options).mode("append").save()

profile picture
已回答 10 個月前
  • The streaming is treating delta (as well as hudi and iceberg) like one of the basic formats; anyway, the bug is reported

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南