Why can't query iceberg table in glue job

0

I configure the glue job according to the official document. But it always throw error as shown below when running.

23/01/18 10:38:24 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
  File "/tmp/test_job.py", line 16, in <module>
    AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/dynamicframe.py", line 629, in from_catalog
    return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 188, in create_dynamic_frame_from_catalog
    return source.getFrame(**kwargs)
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/data_source.py", line 36, in getFrame
    jframe = self._jsource.getDynamicFrame()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o101.getDynamicFrame.
: java.lang.Exception: Unsupported dataframe format for job bookmarks
	at org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:103)
	at com.amazonaws.services.glue.SparkSQLDataSource.$anonfun$getDynamicFrame$24(DataSource.scala:794)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:90)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:83)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:90)
	at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:762)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:102)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:102)
	at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:726)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

Script of glue job:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
    database="source_db",
    table_name="source_table",
    transformation_ctx="AWSGlueDataCatalog_node1674017752048",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=AWSGlueDataCatalog_node1674017752048,
    mappings=[
        ("time", "timestamp", "time", "timestamp"),
        ("name", "string", "name", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node MySQL table
MySQLtable_node3 = glueContext.write_dynamic_frame.from_catalog(
    frame=ApplyMapping_node2,
    database="target_db",
    table_name="target_table",
    transformation_ctx="MySQLtable_node3",
)

job.commit()

Source table definition:

CREATE TABLE source_db.source_table (
  time timestamp,
  name string)
PARTITIONED BY (`name`)
LOCATION 's3://source_db/source_table'
TBLPROPERTIES (
  'table_type'='iceberg'
);
asked a year ago1096 views
2 Answers
0

Hello, The error seems to be related to Glue Job bookmark (find supported data formats in AWS Docs. https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html). Please try disable Job bookmark on the job, remove the transformation_ctx parameter from the nodes, and remove job.init() and job.commit() functions. These two functions initialize and commit the bookmark service as stated on the previous doc link shared.

profile pictureAWS
answered a year ago
0

I can confirm that disabling Job Bookmarks does not help.

Of course, with Glue Interactive Jupyter Notebooks, the only way to explicitly disable Job Bookmarks is by using magics. Below is my %%configure magic showing how I tried to disable Job Notebooks:

%%configure
{
    "--datalake-formats": "iceberg",
    "--job-bookmark-option": "job-bookmark-disable",
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.AwsDataCatalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.AwsDataCatalog.warehouse=s3://sandbox-mededatalake-transformed --conf spark.sql.catalog.AwsDataCatalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.AwsDataCatalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO  --conf spark.sql.catalog.AwsDataCatalog.glue.lakeformation-enabled=true --conf spark.sql.catalog.AwsDataCatalog.glue.id=***********"
}

I can tell Job Bookmarks is actually disabled for this job by going to the Job Details tab where it shows as below in the UI (greyed out):

Job bookmark

Specifies how AWS Glue processes job bookmark when the job runs. It can remember previously processed data (Enable), update state information (Pause), or ignore state information (Disable).

Disable (dropdown selection)

I have gotten rid of all references to the Job library. Literally, the only code in my notebook after my imports is below:

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
dyf = glueContext.create_dynamic_frame.from_catalog(database='<schema_name>', table_name='<table_name>')
dyf.printSchema()

And yet, I still get this useless error message:

Py4JJavaError: An error occurred while calling o96.getDynamicFrame.
: java.lang.Exception: Unsupported dataframe format for job bookmarks
	at org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:103)
	at com.amazonaws.services.glue.SparkSQLDataSource.$anonfun$getDynamicFrame$24(DataSource.scala:799)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:102)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:95)
	at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:102)
	at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:767)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:104)
	at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:104)
	at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:732)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

I don't think it is possible to read Iceberg tables from the Glue Catalog into Glue Dynamic Frames. I think AWS advertises this but it is not actually possible. It is a great disservice to customers to purport support for Iceberg while it is not actually there. It causes a huge waste of time in investigation and diagnosis. The error messages from AWS Services are 99% useless because they invariably hide the real problem. That means what should take just a few seconds to diagnose actually takes days in support requests which usually go nowhere because they are so exhaustingly repetitive and just go in circles. (AWS support engineers can't even look at what you're working on.) It would literally take one-thousandth of the time to diagnose such problems if AWS services didn't mask the real problems with their bad error messages.

answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions