Pyspark job fails on EMR on EKS virtual cluster: java.lang.ClassCastException

0

Hi,

we are in the process of migrating our pyspark jobs from EMR classic (EC2-based) to EMR on EKS virtual cluster. We have come across a strange failure in one job where we are reading some avro data from s3 and saving them straight back in parquet format. Example code:

        df = spark.read.format("avro").load(input_path)
        df \
            .withColumnRenamed("my_col", "my_new_col") \
            .repartition(60) \
            .write \
            .mode("append") \
            .partitionBy("my_new_col", "date") \
            .format("parquet") \
            .option("compression", "gzip") \
            .save(output_path)

This fails with the following message at the .save() call (We can tell from the Python traceback, not included here for brevity):

Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 17) (10.0.3.174 executor 4): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.dataReader$1 of type scala.Function1 in instance of org.apache.spark.sql.execution.datasources.FileFormat$$anon$1

We are running this with --packages org.apache.spark:spark-avro_2.12:3.1.1 in sparkSubmitParameters. Exact same code ran fine in a normal EMR cluster. Comparing the environments, both have Spark 3.1.1, Scala version version 2.12.10, only the Java version is different: 1.8.0_332 (EMR classic) vs 1.8.0_302 (EMR on EKS).

We should also mention that we were able to run another job successfuly on EMR on EKS, that job doesn't have this avro-to-parquet step (the input is already in parquet format). So we suspect it has something to do with the extra org.apache.spark:spark-avro_2.12:3.1.1 package we are importing.

We searched the web for the java.lang.ClassCastException and found a couple of issues here and here, but they are not particularly helpful to us since our code is in Python.

Any hints what might be the cause?

Thanks and regards,

Nikos

nikos64
asked 2 years ago1217 views
1 Answer
0

I was able to overcome this issue after realizing that the container image used by the driver and the executor pods (e.g. 755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.3.0) has already a spark-avro.jar in /usr/lib/spark/external/lib/ directory.

I replaced the --packages org.apache.spark:spark-avro_2.12:3.1.1 option (which downloads spark-avro from Maven Central repository) with --jars local:///usr/lib/spark/external/lib/spark-avro.jar in sparkSubmitParameters... and the script worked!

nikos64
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions