Delta Lake write from AWS Glue streaming job

0

Looks like attempting to write to a Delta Lake table from a DynamicFrame is not working. The Visual Glue interface generates a script like:

        s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

that fails with the following error:

An error occurred while calling o183.pyWriteDynamicFrame. : java.lang.ClassNotFoundException: Failed to load format with name delta.

Also a sample listed here https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-delta-lake.html does not work but for a different reason (https://stackoverflow.com/questions/76262252/creating-a-delta-table-in-s3-with-glue-delta-lake-creates-a-glue-catalog-table)

The only way I found to save data in a DeltaLake table is going back using a DataFrame writer not attempting to register the table in the Glue catalogue:

 additional_options = {
            "path": "s3://....",
            "write.parquet.compression-codec": "snappy",
        }
        df = MyDinamicFrame.toDF()
        df.write.format("delta").options(**additional_options).mode("append").save()

Am I missing something? I am using Glue 4.0 with the default DeltaLake version.

profile picture
已提问 10 个月前471 查看次数
2 回答
1
已接受的回答

That sounds like a detect, when using the same Delta sink on a regular non streaming job, the code looks like the second one you describe. Will open a ticket to the streaming team.

profile pictureAWS
专家
已回答 10 个月前
  • I've run into a very similar error trying to write to an iceberg table from a Glue streaming job.

0

No, it is correct. In the same Visual Job add a parent data node (Kafka in my case). Once this is in place you will see that the script changes to something like:

s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

I made the same test you did and is how discovered that this version works:

S3bucket_node3_df.write.format("delta").options(**additional_options).mode("append").save()

profile picture
已回答 10 个月前
  • The streaming is treating delta (as well as hudi and iceberg) like one of the basic formats; anyway, the bug is reported

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则