ICEBERG - MERGE INTO doesn't work in Glue Job 4.0 from docker image aws-glue-libs:glue_libs_4.0.0_image_01

0

I have an issue with **Glue Job **running from docker image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 while "MERGE INTO" ICEBERG table.

I follow the instruction from https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/.

My code contains of:

  1. CREATE TABLE {my_catalog}.{glue_database}.{table_name} ... TBLPROPERTIES ('table_type'='ICEBERG', .... -> works
  2. spark.read.format("iceberg").load(f"{glue_database}.{table_name}") .... -> works
  3. my_df.createOrReplaceTempView('my_view') merge_query = f""" MERGE INTO {glue_database}.{table_name} t USING (select * from my_view) s ON (t.id = s.id) WHEN NOT MATCHED THEN INSERT * """ spark.sql(merge_query). -> ERROR

(1) & (2) works in Docker. The issue arises only while running (3) from Docker; the code works perfectly for all 3 steps when running from the AWS Glue console.

ERROR: An error occurred while calling o45.sql. : java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily. at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:895) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657) at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)

Additional information:

  • In Docker & AWS Glue Job console I have:
  • AWS Glue version: 4.0
  • Iceberg version: 1.0.0
  • Spark version: 3.3.0-amzn-1
  • My data are stored on s3 and iceberg is configured for this.
  • Iceberg config in python script.

    path_warehouse = f"s3://{bucket}/iceberg/warehouse" spark.conf.set("spark.sql.defaultCatalog", my_catalog) spark.conf.set(f"spark.sql.catalog.{my_catalog}", "org.apache.iceberg.spark.SparkCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.warehouse", f"{path_warehouse}/") spark.conf.set(f"spark.sql.catalog.{my_catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") spark.conf.set(f"spark.sql.catalog.{my_catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
  • From Docker, I am able to create tables and read data, but when trying to merge, it throws an error. From AWS Glue console everything works.

I am running out of idea what is the issue here, please help me.

  • Have you tried with "MERGE INTO {my_catalog}.{glue_database}.{table_name} ....."

  • Yes, I tried this, but with no success. Please keep in mind that the following code works in AWS Glue console. The issue is only with MERGE INTO while running from Docker.

Nessuna risposta

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande