Problems with Glue writing Iceberg outputs to S3

0

How do I solve the following error in my PySpark Glue job?

Error Category: UNCLASSIFIED_ERROR; Failed Line Number: 882; An error occurred while calling o599.createOrReplace. Job aborted due to stage failure: ShuffleMapStage 43 (createOrReplace at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason:. Note: This run was executed with Flex execution. Check the logs if run failed due to executor termination.

My settings are:

  • Type: Spark
  • Glue version: Glue 4.0 - Supports spark 3.3, Scala 2, Python 3
  • Language: Python 3
  • Worker type: G 2X (8vCPU and 32GB RAM)

And I apply these parameters:

  • --write-shuffle-files-to-s3 TRUE
  • --conf spark.shuffle.glue.s3ShuffleBucket=s3://my_bucket/my_job/
  • --datalake-formats iceberg

Do you need more information from my side?

  • Check the reason for the task failures in the logs or SparkUI, it's possible that simply the flex nodes were removed and some task failed multiple times

asked 5 months ago262 views
2 Answers
0

Hello Thomas,

From your description, it looks like the job is failing at the transformation stage, since it reports failure at ShuffleMapStage. I believe you are aware of the working of a Spark ETL job, where the task is distributed among worker nodes, and when any of your transformation like join() or coalesce() or any functions which require a shuffle of data between these nodes is called, this requires data to be moved and sometimes may fail due to the complexity of the transfer.

I could see you are using G.2X which is quite good for memory intensive operations, however, depending on your use case, you should plan the Worker-type and number of workers. I would encourage you to have a look at the Glue Job Metrics of your job run (Refer this to learn to view your metrics) and check the

  1. glue.{driver,.....}.system.cpuSystemLoad
  2. glue.{driver,.....}.jvm.heap.usage

to check the overall CPU consumption and memory consumption.

If all your worker nodes have even distribution of CPU and memory, this is good and if they are at around the maximum, i.e for cpu around 0.9 and for memory >50% usage constantly for a long execution time, then you need to scale horizontally or vertically, either by increasing the number of workers (horizontal scaling) or increase the existing configuration to use G.4X, etc (vertical scaling) to support the complexity. I would recommend having a look at Monitoring Jobs for DPU capacity planning for more information on this.

Depending on your script, you might need some optimisations that should be done at code level as well, for example if you are using join() in your code, you should use techniques like Broadcase Hints etc and optimise them.

I hope the above information serves you well. Please feel free to reach out to us with a Support case if you need more assistance in issues such as this.

AWS
answered 5 months ago
  • Thank you for your detailed answer! I add a comment with images as a new answer, because I cannot add images here.

0

Thanks for your detailed answer, Shrivathsa!

My issue is, that the monitoring plots do not look suspicious at all to me. Or do I overlook something critical?

Glue Job Run

Glue Job Run Monitoring

In the meanwhile I deactivated the Flex mode and changed to

spark_config = pyspark.conf.SparkConf()
spark_config.set("spark.shuffle.storage.path", f"s3://{shuffle_data_bucket}/")
spark_config.set("spark.shuffle.sort.io.plugin.class", "com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin")
...
spark_context = pyspark.context.SparkContext(conf=spark_config)
glue_context = awsglue.context.GlueContext(spark_context)

but it does not change the behaviour. Also there is no useful hint in the error logs of the job. To me it looks like a shuffle object, which has been written in the first part of the job, can later not be read properly.

answered 5 months ago
  • Thanks for providing your Glue Metrics for your job run, the important graphs that you need to look at here is the CPU utilisation and the data shuffle across the executors. You can notice your shuffle operations are costly, i.e, data shuffle across executors is alarmingly high in your case. This is potentially problematic for your job.

    You need to be aware of one more concept called over-parallelism, which I guess is the problem in your case. If you have a lot of memory intensive transformations, where there is a need for more shuffle across executors, it is sometimes better you opt for less number of workers instead.

    From you job I can see you have allocated 100 workers for the job. I would like to stress on the fact that "More no of workers, does not always correspond to faster execution/good performance". It often adds another layer of complexity to your job. Instead of 100 G.2X, I would rather suggest 50 G.4X workers, or even 25 G.8X where all these configuration correspond to the same DPUs and same cost of usage. (Since 1 G.1X=1 DPU, 1 G.2X = 2 DPU,... and you are charged based on DPU hours.)

    When you do something like this, where you change from 100 G.2X to 50 G.4X, what you are potentially doing is reducing your need to shuffle data across multiple nodes, and you are transforming from horizontal scaling to vertical scaling. Where your job will be a lot more smoother.

    I hope this helps!

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions