Pyspark job fails on EMR on EKS virtual cluster: java.lang.ClassCastException

0

Hi,

we are in the process of migrating our pyspark jobs from EMR classic (EC2-based) to EMR on EKS virtual cluster. We have come across a strange failure in one job where we are reading some avro data from s3 and saving them straight back in parquet format. Example code:

        df = spark.read.format("avro").load(input_path)
        df \
            .withColumnRenamed("my_col", "my_new_col") \
            .repartition(60) \
            .write \
            .mode("append") \
            .partitionBy("my_new_col", "date") \
            .format("parquet") \
            .option("compression", "gzip") \
            .save(output_path)

This fails with the following message at the .save() call (We can tell from the Python traceback, not included here for brevity):

Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 17) (10.0.3.174 executor 4): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.dataReader$1 of type scala.Function1 in instance of org.apache.spark.sql.execution.datasources.FileFormat$$anon$1

We are running this with --packages org.apache.spark:spark-avro_2.12:3.1.1 in sparkSubmitParameters. Exact same code ran fine in a normal EMR cluster. Comparing the environments, both have Spark 3.1.1, Scala version version 2.12.10, only the Java version is different: 1.8.0_332 (EMR classic) vs 1.8.0_302 (EMR on EKS).

We should also mention that we were able to run another job successfuly on EMR on EKS, that job doesn't have this avro-to-parquet step (the input is already in parquet format). So we suspect it has something to do with the extra org.apache.spark:spark-avro_2.12:3.1.1 package we are importing.

We searched the web for the java.lang.ClassCastException and found a couple of issues here and here, but they are not particularly helpful to us since our code is in Python.

Any hints what might be the cause?

Thanks and regards,

Nikos

nikos64
질문됨 2년 전1234회 조회
1개 답변
0

I was able to overcome this issue after realizing that the container image used by the driver and the executor pods (e.g. 755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.3.0) has already a spark-avro.jar in /usr/lib/spark/external/lib/ directory.

I replaced the --packages org.apache.spark:spark-avro_2.12:3.1.1 option (which downloads spark-avro from Maven Central repository) with --jars local:///usr/lib/spark/external/lib/spark-avro.jar in sparkSubmitParameters... and the script worked!

nikos64
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인