AWS EMR - YARN Resource Issue

0

Hi everyone, I am using AWS EMR to do some ETL operations on very large datasets (like millions/billions of records). I am using PySpark and reading the csv files using spark.read.csv. The results are written into a postgres dataframe. I am pasting a sample part of my codebase (do note that this is only a small portion of the code).

def data_profiling(self, cursor, df, attribute_ids, existing_attribute_ids):
	total_rows = df.count()

    null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
    unique_count = df.agg(*(countDistinct(col).alias(col) for col in df.columns)).collect()[0]
    duplicate_count = df.groupBy(df.columns).count().filter('count > 1').count()
    junk_count = df.select([count(when(~col(c).rlike("^[A-Za-z0-9]"), c)).alias(c) for c in df.columns]).collect()[0]

        # Calculate percentages
     null_percentage = {col: (null_count[col] / total_rows) * 100 for col in df.columns}
     unique_percentage = {col: (unique_count[col] / total_rows) * 100 for col in df.columns}
     duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
     junk_percentage = {col: (junk_count[col] / total_rows) * 100 for col in df.columns}

      try:
      	for column in df.columns:        
        if column == "record_id":
        	continue
        attribute_id = attribute_ids.get(column)

        if attribute_id is not None and attribute_id not in existing_attribute_ids:
      		creation_time = datetime.datetime.now()

            null_payload = {"count": null_count[column], "percentage": null_percentage[column]}
            unique_payload = {"count": unique_count[column], "percentage": unique_percentage[column]}
            duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
            junk_payload = {"count": junk_count[column], "percentage": junk_percentage[column]}

            profile_query = "INSERT INTO profiles (profile_type, payload, updated_at, created_at, attribute_id) VALUES (%s, %s, %s, %s, %s)"

            cursor.execute(profile_query, ("null", json.dumps(null_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("unique", json.dumps(unique_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("duplicate", json.dumps(duplicate_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("junk", json.dumps(junk_payload), creation_time, creation_time, attribute_id))

	conn.commit() 

While running the code with the said datasets I'm facing the following error:

**Warning: The Spark session does not have enough YARN resources to start. The code failed because of a fatal error:   Session 10 unexpectedly reached final status 'dead'. See logs: stdout:

stderr: Mar 13, 2024 8:47:06 AM org.apache.spark.launcher.Log4jHotPatchOption staticJavaAgentOption WARNING: spark.log4jHotPatch.enabled is set to true, but /usr/share/log4j-cve-2021-44228-hotpatch/jdk17/Log4jHotPatchFat.jar does not exist at the configured location

24/03/13 08:47:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/03/13 08:47:12 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-10-0-9-124.ec2.internal/10.0.9.124:8032 24/03/13 08:47:13 INFO Configuration: resource-types.xml not found 24/03/13 08:47:13 INFO ResourceUtils: Unable to find 'resource-types.xml'. 24/03/13 08:47:13 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (5120 MB per container) 24/03/13 08:47:13 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead 24/03/13 08:47:13 INFO Client: Setting up container launch context for our AM 24/03/13 08:47:13 INFO Client: Setting up the launch environment for our AM container 24/03/13 08:47:13 INFO Client: Preparing resources for our AM container 24/03/13 08:47:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 24/03/13 08:47:35 INFO Client: Uploading resource file:/mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216/__spark_libs__16724108829559673263.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/__spark_libs__16724108829559673263.zip24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/kryo-shaded-4.0.2.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/kryo-shaded-4.0.2.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-api-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-api-0.7.1-incubating.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-rsc-0.7.1-incubating.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-thriftserver-session-0.7.1-incubating.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/minlog-1.3.0.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/minlog-1.3.0.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/netty-all-4.1.17.Final.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/netty-all-4.1.17.Final.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/objenesis-2.5.1.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/objenesis-2.5.1.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/commons-codec-1.9.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/commons-codec-1.9.jar24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/kryo-shaded-4.0.2.jar added multiple times to distributed cache 24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-core_2.12-0.7.1-incubating.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-repl_2.12-0.7.1-incubating.jar24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/minlog-1.3.0.jar added multiple times to distributed cache 24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/objenesis-2.5.1.jar added multiple times to distributed cache 24/03/13 08:47:45 INFO Client: Uploading resource file:/etc/spark/conf.dist/hive-site.xml -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/hive-site.xml24/03/13 08:47:46 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/hudi-defaults.conf24/03/13 08:47:46 INFO Client: Uploading resource file:/usr/lib/spark/R/lib/sparkr.zip#sparkr -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/sparkr.zip24/03/13 08:47:46 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/pyspark.zip24/03/13 08:47:47 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/py4j-0.10.9.7-src.zip24/03/13 08:47:48 INFO Client: Uploading resource file:/mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216/__spark_conf__10224235219861396504.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/spark_conf.zip24/03/13 08:47:51 INFO SecurityManager: Changing view acls to: livy 24/03/13 08:47:51 INFO SecurityManager: Changing modify acls to: livy 24/03/13 08:47:51 INFO SecurityManager: Changing view acls groups to: 24/03/13 08:47:51 INFO SecurityManager: Changing modify acls groups to: 24/03/13 08:47:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: livy; groups with view permissions: EMPTY; users with modify permissions: livy; groups with modify permissions: EMPTY 24/03/13 08:47:51 INFO Client: Submitting application application_1710314844894_0011 to ResourceManager 24/03/13 08:47:52 INFO YarnClientImpl: Submitted application application_1710314844894_0011 24/03/13 08:47:52 INFO Client: Application report for application_1710314844894_0011 (state: ACCEPTED) 24/03/13 08:47:52 INFO Client:    client token: N/A    diagnostics: [Wed Mar 13 08:47:51 +0000 2024] Application is added to the scheduler and is not yet activated. Queue's AM resource limit exceeded. Details : AM Partition = <DEFAULT_PARTITION>; AM Resource Request = <memory:2432, max memory:5120, vCores:1, max vCores:8>; Queue Resource Limit for AM = <memory:5120, vCores:1>; User AM Resource Limit of the queue = <memory:5120, vCores:1>; Queue AM Resource Usage = <memory:4864, vCores:2>;    Application host: N/A    Application RPC port: -1    queue: default    start time: 1710319671958    final status: UNDEFINED    tracking URL: http://ip-10-0-9-124.ec2.internal:20888/proxy/application_1710314844894_0011/ user: livy 24/03/13 08:47:52 INFO ShutdownHookManager: Shutdown hook called 24/03/13 08:47:52 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-0c342f25-f293-4bea-9139-12e36b315e71 24/03/13 08:47:52 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216

YARN Diagnostics: Application application_1710314844894_0011 was killed by user livy at 10.0.9.124.**

I am new to AWS and the same code used to work in Databricks without any issue (although it was slow). What can I do to solve this issue?

vsk95
asked a month ago302 views
1 Answer
3

Hello,

Seems you configuration hitting either yarn.scheduler.capacity.maximum-am-resource-percent limit or driver memory not appropriate to initiate the AM. So,

  1. Increase "yarn.scheduler.capacity.maximum-am-resource-percent" that increases the RES limit for AM.
  2. Decrease "spark.driver.memory", which is the mount of memory to use for the driver process. In the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). In client mode, this config must not be set through the SparkConf directly in your application. Instead, please set this through the --driver-memory command line option or in your default properties file.
  3. Submit the job with EMR-Step and see if that is also creating the same issue.

If above not working, please share me what is your spark-submit command and the cluster resource configuration.

AWS
SUPPORT ENGINEER
answered 25 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions