ICEBERG - MERGE INTO doesn't work in Glue Job 4.0 from docker image aws-glue-libs:glue_libs_4.0.0_image_01

0

I have an issue with **Glue Job **running from docker image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 while "MERGE INTO" ICEBERG table.

I follow the instruction from https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/.

My code contains of:

  1. CREATE TABLE {my_catalog}.{glue_database}.{table_name} ... TBLPROPERTIES ('table_type'='ICEBERG', .... -> works
  2. spark.read.format("iceberg").load(f"{glue_database}.{table_name}") .... -> works
  3. my_df.createOrReplaceTempView('my_view') merge_query = f""" MERGE INTO {glue_database}.{table_name} t USING (select * from my_view) s ON (t.id = s.id) WHEN NOT MATCHED THEN INSERT * """ spark.sql(merge_query). -> ERROR

(1) & (2) works in Docker. The issue arises only while running (3) from Docker; the code works perfectly for all 3 steps when running from the AWS Glue console.

ERROR: An error occurred while calling o45.sql. : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily. at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:895) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657) at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)

Additional information:

  • In Docker & AWS Glue Job console I have:
  • AWS Glue version: 4.0
  • Iceberg version: 1.0.0
  • Spark version: 3.3.0-amzn-1
  • My data are stored on s3 and iceberg is configured for this.
  • Iceberg config in python script.

    path_warehouse = f"s3://{bucket}/iceberg/warehouse" spark.conf.set("spark.sql.defaultCatalog", my_catalog) spark.conf.set(f"spark.sql.catalog.{my_catalog}", "org.apache.iceberg.spark.SparkCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.warehouse", f"{path_warehouse}/") spark.conf.set(f"spark.sql.catalog.{my_catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
  • From Docker, I am able to create tables and read data, but when trying to merge, it throws an error. From AWS Glue console everything works.

I am running out of idea what is the issue here, please help me.

  • Have you tried with "MERGE INTO {my_catalog}.{glue_database}.{table_name} ....."

  • Yes, I tried this, but with no success. Please keep in mind that the following code works in AWS Glue console. The issue is only with MERGE INTO while running from Docker.

No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions