XML interpret one struct as an array

0

I've been trying this for a week but I'm starting to give up - I need some help understanding this. I have an S3 bucket full of XML files, and I am creating a pyspark ETL job to convert them to Parquet so I can query them in Athena.

Within each XML file, there is an XML tag called ORDER_LINE. This tag is supposed to be an array of items, however in many files, there is only one item. XML does not have the concept of arrays, so when I pass this into my ETL job, Glue interprets the field as a Choice type in the schema, where it could either be an array or a struct type. I need to coerce this into an array type at all times. Here's a list of everything I've tried:

  1. Using ResolveChoice to cast to an array. This doesn't work because a struct can't be casted to an array

  2. Doing ResolveChoice to "make_struct", then the Map.apply() step to map the field where if "struct" has data, transform it to [struct]. This doesn't work and the Map docs hint that it does not support the python map function for arrays.

  3. Converting the dynamic frame to a data frame, and then using pyspark withColumn(when(struct.isNotNull, [struct]).otherwise(array)) functions to convert the struct to an array, or make the array the main object, depending on which one is not null. This doesn't work because Glue is inferring the schema in the structs, and the fields in the structs are in a different order, so while all the fields in the schema are the same, Spark can't combine the result because the schema is not exactly the same.

  4. Converting to data frame, then using a pyspark UDF to transform the data. This worked on a small dev sample set, but failed on the production dataset. The error message was extremely cryptic and I wasn't able to find the cause. Maybe this could work but I wasn't able to fully understand how to operate on the data in pyspark.

  5. Trying to use the "withSchema" format_option when creating the dynamic frame from XML. The intention is to define the schema beforehand, but running this gives an error:

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` out of VALUE_TRUE token
 at [Source: (String)" [...] (through reference chain: com.amazonaws.services.glue.schema.types.StructType["fields"]->java.util.ArrayList[0]->com.amazonaws.services.glue.schema.types.Field["properties"])

So my question is, how do I make the XML data source for Glue interpret a tag as always an array, instead of a Choice, or how do I combine them? Even StackOverflow failed me here, and the forum post https://forums.aws.amazon.com/thread.jspa?messageID=931586&tstart=0 went unanswered.

Here's a snippet of my pyspark code:

import sys
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.gluetypes import (
    StructType,
    Field,
    StringType,
    IntegerType,
    ArrayType,
)
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "source_bucket_name",
        "target_bucket_name",
    ],
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

source_bucket_name = args["source_bucket_name"]
target_bucket_name = args["target_bucket_name"]

schema = StructType(
    [
        [fields removed as they are sensitive]
        Field(
            "ORDER_LINE",
            ArrayType(
                StructType(
                    [
                        Field("FIELD1", IntegerType(), True),
                        Field(
                            "FIELD2",
                            StructType([Field("CODE", StringType(), True)]),
                            True,
                        ),
                        Field(
                            "FIELD#",
                            StructType([Field("CODE", StringType(), True)]),
                            True,
                        ),
                        [fields removed]
                    ]
                )
            ),
            True,
        ),
    ]
)

datasource0 = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": [f"s3://{source_bucket_name}"]},
    format="xml",
    format_options={
        "rowTag": "ORDER",
        "withSchema": json.dumps(schema.jsonValue()),
    },
    transformation_ctx="datasource0",
)

[more steps after this]
Eelviny
asked 3 years ago2016 views
3 Answers
1
Accepted Answer

hi again,

I have been testing the suggested solution (just using your AWS forums xml example data). If you add the Databrics XML library to glue then you can let the library to infer your schema (it does well in my case with two files, one containing the array and the another without it):

from pyspark.sql import SparkSession
from pyspark.sql.types import *

df =spark.read.format('xml').options(rowTag='indices').load('s3_dir')

df.show()
df.printSchema()

If you want to declare the schema in advance it will be similar to this.

my_new_schema = StructType([
    StructField('index', ArrayType(StructType([
        StructField('indexname', StringType()),
        StructField('indexsymbol', StringType())
    ])))
])
df = spark.read.format('xml').options(rowTag='indices').load('s3_dir', schema = my_new_schema)

You would need to add to the Glue Job parameters Conf the following in order to add the library to Glue.

spark.jars.packages = com.databricks:spark-xml_2.12:0.13.0

Bests

profile pictureAWS
answered 3 years ago
profile picture
EXPERT
reviewed a year ago
AWS
EXPERT
reviewed 3 years ago
  • Nice, I'll give this a go! I've not really gone outside of the options given by pyspark, and I can't quite figure out how to input spark.jars.packages = com.databricks:spark-xml_2.12:0.13.0. I tried adding it as a command line argument --conf spark.jars.packages=com.databricks:spark-xml_2.12:0.13.0 but my CDK app rejects this as --conf is a reserved parameter that shouldn't be set. How are you adding it?

  • The official way to go is to use the approach of uploading the libraries to S3 and then reference them on --extra-jars parameter.

    Look at the documentation here: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html

    And you can look for a CloudFormation template here: https://github.com/aws-samples/amazon-deequ-glue/blob/master/src/template.yaml

    Sorry but I am not an expert on CDK, hopefully you have that parameter there.

  • Thanks @acmanjon, got it working in CDK! The options are basically the same there, and it has the added bonus that I can add the JAR as an asset and it will handle uploading and referencing the JAR. However I've hit another snag, when using spark XML, Glue seems to lose the information it needs to handle job bookmarks, so running the job a subsequent time does the whole run again. Will keep searching

1

Hi, Could you paste the 4rd option error? I think it should be the way to go.

Another option is to use Databricks XML library. If you specify the schema forehand it should work.

https://github.com/databricks/spark-xml

Bests

profile pictureAWS
answered 3 years ago
  • Thanks! I forgot to mention that I'm using pyspark (I'm not a java expert by any means). I'll add sample code to my question.

    Here's the UDF code I tried:

    schema = ArrayType(
        StructType(
            [
                StructField("DY_AMOUNT", IntegerType(), True),
                StructField(
                    "PRODUCT_DRAGER",
                    StructType([StructField("CODE", StringType(), True)]),
                    True,
                ),
                StructField(
                    "PRODUCT_SOORT",
                    StructType([StructField("CODE", StringType(), True)]),
                    True,
                ),
                StructField("PU_AMOUNT", IntegerType(), True),
                StructField("RC_EQUIVALENT_DY_AMOUNT", IntegerType(), True),
                StructField("RC_EQUIVALENT_PU_AMOUNT", IntegerType(), True),
                StructField("RC_EQUIVALENT_SCHEDULED", IntegerType(), True),
                StructField("SCHEDULED", IntegerType(), True),
            ]
        )
    )
    
    struct_to_array = udf(
        lambda order: order.array if order.array else [order.struct], schema
    )
    
    map0 = dataframe0.withColumn("ORDER_LINE", struct_to_array(col("ORDER_LINE")))
    `` `
    
  • And the error:

    [task-result-getter-2] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 32.0 in stage 2.0 (TID 189) (172.34.232.204 executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      [...]
      File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 449, in toInternal
        return self.dataType.toInternal(obj)
      File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 641, in toInternal
        raise ValueError("Unexpected tuple %r with StructType" % obj)
    ValueError: Unexpected tuple 2 with StructType
    
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
    	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:84)
    	[...]
    
0

@acmanjon's solution is the most elegant way of solving the problem, with the Spark XML library handling the inference of the struct as an array. However, that method doesn't use Glue's create_dynamic_frame to create the dataframe, meaning that Glue bookmarks don't work. So, I present the second option: the string hack. By using ResolveChoice to cast the struct/array as a string, we can use a combination of pyspark's UDF and from_json functions to take the serialized string and convert simply to a uniform schema. It's not pretty or performant, but solves the immediate issue.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col, udf, from_json
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    ArrayType,
)
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "source_bucket_name",
        "target_bucket_name",
    ],
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

source_bucket_name = args["source_bucket_name"]
target_bucket_name = args["target_bucket_name"]

datasource0 = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": [f"s3://{source_bucket_name}"]},
    format="xml",
    format_options={"rowTag": "ORDER"},
    transformation_ctx="datasource0",
)

# Resolve ambiguous data fields; casting to correct value
datachoice0 = ResolveChoice.apply(
    frame=datasource0,
    choice="cast:string",
    transformation_ctx="datachoice0",
)

schema = ArrayType(
    StructType(
        [...put your schema here]
    )
)
# In string form, look to see if the string is in a square bracket [, indicating an array, if not add them
@udf(returnType=StringType())
def struct_to_array(order):
    if order:
        return f"[{order}]" if order[:1] != "[" else order
    # Handle case where "array" is empty
    return "[]"

map0 = datachoice0.toDF().withColumn(
    "ORDER_LINE", from_json(struct_to_array(col("ORDER_LINE")), schema)
)
fromdataframe0 = DynamicFrame.fromDF(map0, glueContext, "fromdataframe0")

[...continue with the rest of your ETL]
Eelviny
answered 3 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions