ICEBERG - MERGE INTO doesn't work in Glue Job 4.0 from docker image aws-glue-libs:glue_libs_4.0.0_image_01

0

I have an issue with **Glue Job **running from docker image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 while "MERGE INTO" ICEBERG table.

I follow the instruction from https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/.

My code contains of:

  1. CREATE TABLE {my_catalog}.{glue_database}.{table_name} ... TBLPROPERTIES ('table_type'='ICEBERG', .... -> works
  2. spark.read.format("iceberg").load(f"{glue_database}.{table_name}") .... -> works
  3. my_df.createOrReplaceTempView('my_view') merge_query = f""" MERGE INTO {glue_database}.{table_name} t USING (select * from my_view) s ON (t.id = s.id) WHEN NOT MATCHED THEN INSERT * """ spark.sql(merge_query). -> ERROR

(1) & (2) works in Docker. The issue arises only while running (3) from Docker; the code works perfectly for all 3 steps when running from the AWS Glue console.

ERROR: An error occurred while calling o45.sql. : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily. at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:895) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657) at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)

Additional information:

  • In Docker & AWS Glue Job console I have:
  • AWS Glue version: 4.0
  • Iceberg version: 1.0.0
  • Spark version: 3.3.0-amzn-1
  • My data are stored on s3 and iceberg is configured for this.
  • Iceberg config in python script.

    path_warehouse = f"s3://{bucket}/iceberg/warehouse" spark.conf.set("spark.sql.defaultCatalog", my_catalog) spark.conf.set(f"spark.sql.catalog.{my_catalog}", "org.apache.iceberg.spark.SparkCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.warehouse", f"{path_warehouse}/") spark.conf.set(f"spark.sql.catalog.{my_catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
  • From Docker, I am able to create tables and read data, but when trying to merge, it throws an error. From AWS Glue console everything works.

I am running out of idea what is the issue here, please help me.

  • Have you tried with "MERGE INTO {my_catalog}.{glue_database}.{table_name} ....."

  • Yes, I tried this, but with no success. Please keep in mind that the following code works in AWS Glue console. The issue is only with MERGE INTO while running from Docker.

user777
已提問 10 個月前檢視次數 167 次
沒有答案

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南