- Newest
- Most votes
- Most comments
hi again,
I have been testing the suggested solution (just using your AWS forums xml example data). If you add the Databrics XML library to glue then you can let the library to infer your schema (it does well in my case with two files, one containing the array and the another without it):
from pyspark.sql import SparkSession
from pyspark.sql.types import *
df =spark.read.format('xml').options(rowTag='indices').load('s3_dir')
df.show()
df.printSchema()
If you want to declare the schema in advance it will be similar to this.
my_new_schema = StructType([
StructField('index', ArrayType(StructType([
StructField('indexname', StringType()),
StructField('indexsymbol', StringType())
])))
])
df = spark.read.format('xml').options(rowTag='indices').load('s3_dir', schema = my_new_schema)
You would need to add to the Glue Job parameters Conf the following in order to add the library to Glue.
spark.jars.packages = com.databricks:spark-xml_2.12:0.13.0
Bests
Hi, Could you paste the 4rd option error? I think it should be the way to go.
Another option is to use Databricks XML library. If you specify the schema forehand it should work.
https://github.com/databricks/spark-xml
Bests
Thanks! I forgot to mention that I'm using pyspark (I'm not a java expert by any means). I'll add sample code to my question.
Here's the UDF code I tried:
schema = ArrayType( StructType( [ StructField("DY_AMOUNT", IntegerType(), True), StructField( "PRODUCT_DRAGER", StructType([StructField("CODE", StringType(), True)]), True, ), StructField( "PRODUCT_SOORT", StructType([StructField("CODE", StringType(), True)]), True, ), StructField("PU_AMOUNT", IntegerType(), True), StructField("RC_EQUIVALENT_DY_AMOUNT", IntegerType(), True), StructField("RC_EQUIVALENT_PU_AMOUNT", IntegerType(), True), StructField("RC_EQUIVALENT_SCHEDULED", IntegerType(), True), StructField("SCHEDULED", IntegerType(), True), ] ) ) struct_to_array = udf( lambda order: order.array if order.array else [order.struct], schema ) map0 = dataframe0.withColumn("ORDER_LINE", struct_to_array(col("ORDER_LINE"))) `` `
And the error:
[task-result-getter-2] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 32.0 in stage 2.0 (TID 189) (172.34.232.204 executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last): [...] File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 449, in toInternal return self.dataType.toInternal(obj) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 641, in toInternal raise ValueError("Unexpected tuple %r with StructType" % obj) ValueError: Unexpected tuple 2 with StructType at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:84) [...]
@acmanjon's solution is the most elegant way of solving the problem, with the Spark XML library handling the inference of the struct as an array. However, that method doesn't use Glue's create_dynamic_frame
to create the dataframe, meaning that Glue bookmarks don't work. So, I present the second option: the string hack.
By using ResolveChoice to cast the struct/array as a string, we can use a combination of pyspark's UDF and from_json functions to take the serialized string and convert simply to a uniform schema. It's not pretty or performant, but solves the immediate issue.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col, udf, from_json
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
ArrayType,
)
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"source_bucket_name",
"target_bucket_name",
],
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
source_bucket_name = args["source_bucket_name"]
target_bucket_name = args["target_bucket_name"]
datasource0 = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [f"s3://{source_bucket_name}"]},
format="xml",
format_options={"rowTag": "ORDER"},
transformation_ctx="datasource0",
)
# Resolve ambiguous data fields; casting to correct value
datachoice0 = ResolveChoice.apply(
frame=datasource0,
choice="cast:string",
transformation_ctx="datachoice0",
)
schema = ArrayType(
StructType(
[...put your schema here]
)
)
# In string form, look to see if the string is in a square bracket [, indicating an array, if not add them
@udf(returnType=StringType())
def struct_to_array(order):
if order:
return f"[{order}]" if order[:1] != "[" else order
# Handle case where "array" is empty
return "[]"
map0 = datachoice0.toDF().withColumn(
"ORDER_LINE", from_json(struct_to_array(col("ORDER_LINE")), schema)
)
fromdataframe0 = DynamicFrame.fromDF(map0, glueContext, "fromdataframe0")
[...continue with the rest of your ETL]
Relevant content
- asked a year ago
- asked 8 months ago
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 9 months ago
- AWS OFFICIALUpdated a year ago
Nice, I'll give this a go! I've not really gone outside of the options given by pyspark, and I can't quite figure out how to input
spark.jars.packages = com.databricks:spark-xml_2.12:0.13.0
. I tried adding it as a command line argument--conf spark.jars.packages=com.databricks:spark-xml_2.12:0.13.0
but my CDK app rejects this as--conf
is a reserved parameter that shouldn't be set. How are you adding it?The official way to go is to use the approach of uploading the libraries to S3 and then reference them on
--extra-jars
parameter.Look at the documentation here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
And you can look for a CloudFormation template here: https://github.com/aws-samples/amazon-deequ-glue/blob/master/src/template.yaml
Sorry but I am not an expert on CDK, hopefully you have that parameter there.
Thanks @acmanjon, got it working in CDK! The options are basically the same there, and it has the added bonus that I can add the JAR as an asset and it will handle uploading and referencing the JAR. However I've hit another snag, when using spark XML, Glue seems to lose the information it needs to handle job bookmarks, so running the job a subsequent time does the whole run again. Will keep searching