Why can't query iceberg table in glue job

0

I configure the glue job according to the official document. But it always throw error as shown below when running.

23/01/18 10:38:24 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
  File "/tmp/test_job.py", line 16, in <module>
    AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/dynamicframe.py", line 629, in from_catalog
    return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 188, in create_dynamic_frame_from_catalog
    return source.getFrame(**kwargs)
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/data_source.py", line 36, in getFrame
    jframe = self._jsource.getDynamicFrame()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o101.getDynamicFrame.
: java.lang.Exception: Unsupported dataframe format for job bookmarks
	at org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:103)
	at com.amazonaws.services.glue.SparkSQLDataSource.$anonfun$getDynamicFrame$24(DataSource.scala:794)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:90)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:83)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:90)
	at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:762)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:102)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:102)
	at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:726)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

Script of glue job:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
    database="source_db",
    table_name="source_table",
    transformation_ctx="AWSGlueDataCatalog_node1674017752048",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=AWSGlueDataCatalog_node1674017752048,
    mappings=[
        ("time", "timestamp", "time", "timestamp"),
        ("name", "string", "name", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node MySQL table
MySQLtable_node3 = glueContext.write_dynamic_frame.from_catalog(
    frame=ApplyMapping_node2,
    database="target_db",
    table_name="target_table",
    transformation_ctx="MySQLtable_node3",
)

job.commit()

Source table definition:

CREATE TABLE source_db.source_table (
  time timestamp,
  name string)
PARTITIONED BY (`name`)
LOCATION 's3://source_db/source_table'
TBLPROPERTIES (
  'table_type'='iceberg'
);
已提問 1 年前檢視次數 1211 次
2 個答案
0

Hello, The error seems to be related to Glue Job bookmark (find supported data formats in AWS Docs. https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html). Please try disable Job bookmark on the job, remove the transformation_ctx parameter from the nodes, and remove job.init() and job.commit() functions. These two functions initialize and commit the bookmark service as stated on the previous doc link shared.

profile pictureAWS
已回答 1 年前
0

I can confirm that disabling Job Bookmarks does not help.

Of course, with Glue Interactive Jupyter Notebooks, the only way to explicitly disable Job Bookmarks is by using magics. Below is my %%configure magic showing how I tried to disable Job Notebooks:

%%configure
{
    "--datalake-formats": "iceberg",
    "--job-bookmark-option": "job-bookmark-disable",
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.AwsDataCatalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.AwsDataCatalog.warehouse=s3://sandbox-mededatalake-transformed --conf spark.sql.catalog.AwsDataCatalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.AwsDataCatalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO  --conf spark.sql.catalog.AwsDataCatalog.glue.lakeformation-enabled=true --conf spark.sql.catalog.AwsDataCatalog.glue.id=***********"
}

I can tell Job Bookmarks is actually disabled for this job by going to the Job Details tab where it shows as below in the UI (greyed out):

Job bookmark

Specifies how AWS Glue processes job bookmark when the job runs. It can remember previously processed data (Enable), update state information (Pause), or ignore state information (Disable).

Disable (dropdown selection)

I have gotten rid of all references to the Job library. Literally, the only code in my notebook after my imports is below:

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
dyf = glueContext.create_dynamic_frame.from_catalog(database='<schema_name>', table_name='<table_name>')
dyf.printSchema()

And yet, I still get this useless error message:

Py4JJavaError: An error occurred while calling o96.getDynamicFrame.
: java.lang.Exception: Unsupported dataframe format for job bookmarks
	at org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:103)
	at com.amazonaws.services.glue.SparkSQLDataSource.$anonfun$getDynamicFrame$24(DataSource.scala:799)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:102)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:95)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:102)
	at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:767)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:104)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:104)
	at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:732)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

I don't think it is possible to read Iceberg tables from the Glue Catalog into Glue Dynamic Frames. I think AWS advertises this but it is not actually possible. It is a great disservice to customers to purport support for Iceberg while it is not actually there. It causes a huge waste of time in investigation and diagnosis. The error messages from AWS Services are 99% useless because they invariably hide the real problem. That means what should take just a few seconds to diagnose actually takes days in support requests which usually go nowhere because they are so exhaustingly repetitive and just go in circles. (AWS support engineers can't even look at what you're working on.) It would literally take one-thousandth of the time to diagnose such problems if AWS services didn't mask the real problems with their bad error messages.

已回答 3 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南