GLUE ETL Job: JSON to Parquet, how to sort parquet column

0

From https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/:

One way to optimize the number of blocks to be skipped is to identify and sort by a commonly filtered column before writing your ORC or Parquet files. This ensures that the range between the min and max of values within the block are as small as possible within each block. This gives it a better chance to be pruned and also reduces data scanned further.

I have an ETL job that takes JSON and translates it into Parquet, but before I store the parquet, I was wondering how to sort the parquet by a certain column.

How could one do that in a Glue ETL job after creating a create_dynamic_frame_from_options but before write_dynamic_frame?

  • Are you using the visual editor ? Or you have a script ?

asked 8 months ago375 views
1 Answer
0

Hi Kevin,

If you are using the visual editor, there's no out of the box transformation to order your data, so you must create yours. The simplest way is by creating a custom SQL Query transform.

Click on (+) to add a node and select "SQL Query", than just write the really simple query "SELECT * FROM .... ORDER BY <column>"

Glue Custom SQL transformation

This generates the following script, which runs a Spark SQL query:

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

# Script generated for node SQL Query
SqlQuery0 = """
select * from myDataSource order by <myDataSourceColumn>

"""
SQLQuery_node1692843137953 = sparkSqlQuery(
    glueContext,
    query=SqlQuery0,
    mapping={"myDataSource": ChangeSchema_node2},
    transformation_ctx="SQLQuery_node1692843137953",
)

Additionally if you are writing your own script you could convert your Dynamic Frame to a Spark Dataframe and then sort data using the spark api [1]:

sorted_df = myframe.toDF().orderBy(["mycolumn"])

Hope this helps you, if you have further questions please let me know.

Reference: [1] https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html

answered 8 months ago
  • i = glueContext.create_dynamic_frame_from_options(
        connection_type="s3",
        connection_options={"paths":[input_loc], "recurse": True, "compressionType": "gzip", "groupFiles": "inPartition", "groupSize": "104857600"},
        format="json",
    )
    

    I plan on loading in the json via the following group settings ^

    If I sort by a column on the dynamic data frame:

    sorted_df = i.toDF().orderBy(["col"])

    Then output it into parquet, will each parquet file be sorted by the column within the file? I would instead like the column to be sorted "across" the parquet files, if that makes sense.

  • Something like "z-ordering" ?

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions