AWS EMR parallel tasks and performance issue

0

Trying to load data of 200GB into dynamo using spark EMR but facing performance issues.

""" Copy paste the following code in your Lambda function. Make sure to change the following key parameters for the API as per your account

-Name (Name of Spark cluster) -LogUri (S3 bucket to store EMR logs) -Ec2SubnetId (The subnet to launch the cluster into) -JobFlowRole (Service role for EC2) -ServiceRole (Service role for Amazon EMR)

The following parameters are additional parameters for the Spark job itself. Change the bucket name and prefix for the Spark job (located at the bottom).

-s3://your-bucket-name/prefix/lambda-emr/SparkProfitCalc.jar (Spark jar file) -s3://your-bucket-name/prefix/fake_sales_data.csv (Input data file in S3) -s3://your-bucket-name/prefix/outputs/report_1/ (Output location in S3)

""" import json import boto3

client = boto3.client('emr')

def lambda_handler(event, context):

env = event['env']
cb_bucket_name = event['cb-bucket-name']
ddb_table_name = event['ddb-table-name']
existing_table_size_in_DDB_mb = event['existing-table-size-in-DDB-mb']
s3_location_cbexport = event['s3-location-cbexport']
s3_location_largerecords = event['s3-location-largerecords']
s3_location_region_largerecords = event['s3-location-region-largerecords']
time_stamp_for_ddb_table = event['time-stamp-for-ddb-table']
just_migrate_large_doc_to_s3 = event['just-migrate-large-doc-to-s3']
debug_mode = event['debug-mode']

print(env)
print(cb_bucket_name)
print(ddb_table_name)
print(existing_table_size_in_DDB_mb)
print(s3_location_cbexport)
print(s3_location_largerecords)
print(s3_location_region_largerecords)
print(time_stamp_for_ddb_table)
print(just_migrate_large_doc_to_s3)
print(debug_mode)

dataMigrationJobId = "data-migration-job-" + cb_bucket_name + "-" + env
response = client.run_job_flow(
    Name= dataMigrationJobId,
    LogUri= 's3://cls-cb-migration-artifacts-qaint/logs/myjob',
    ReleaseLabel= 'emr-6.2.0',
    Instances={
        'InstanceGroups' : [
           {
                'Name' : 'Master node',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'MASTER',
                'InstanceType' : 'r5d.2xlarge',
                'InstanceCount' : 1,
                

            },
            {
                'Name' : 'Core nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'CORE',
                'InstanceType' : 'r5d.4xlarge',
                'InstanceCount' : 16,
            },
            {
                'Name' : 'Task nodes',
                'Market' : 'SPOT',
                'InstanceRole' : 'TASK',
                'InstanceType' : 'r5d.2xlarge',
                'InstanceCount' : 20,
                'BidPrice' : '0.6'
            }
            ],
        'Ec2KeyName': 'cbtoddb',
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
        'Ec2SubnetId': 'subnet-057dfe60cac41b111'
    },
    Applications = [ {'Name': 'Spark'} ],
    VisibleToAllUsers=True,
    JobFlowRole = 'CouchbaseRole',
    ServiceRole = 'EMR_DefaultRole',
    Steps=[
        {
            'Name': dataMigrationJobId,
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                        'spark-submit',
                        '--deploy-mode', 'cluster',
                        '--executor-memory', '4G',
                        '--driver-memory', '16G',
                        '--conf', 'spark.executor.cores=16',
                        '--conf', 'spark.dynamicAllocation.enabled=true',
                        '--conf', 'spark.dynamicAllocation.minExecutors=50',
                        '--conf', 'spark.dynamicAllocation.maxExecutors=200',
                        '--conf', 'spark.sql.shuffle.partitions =4000',
                        '--conf', 'spark.sql.autoBroadcastJoinThreshold =10485760',
                        '--jars', '/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar',
                        '--class', 'com.pearson.Migration',
                        's3://myoptimizedjarcs/v6/CBToDDB-1.0-SNAPSHOT.jar',
                        env,
                        cb_bucket_name,
                        ddb_table_name,
                        existing_table_size_in_DDB_mb,
                        s3_location_cbexport,
                        s3_location_largerecords,
                        s3_location_region_largerecords,
                        time_stamp_for_ddb_table,
                        just_migrate_large_doc_to_s3,
                        debug_mode
                    ]
            }
        }
    ]
)

return response

Using the above config but performce is not good i want to write data within 60 mins.

asked 5 months ago806 views
4 Answers
0

Hi,

Did you test the DDB bulk import feature (from S3) to compare with EMR: https://aws.amazon.com/about-aws/whats-new/2022/08/amazon-dynamodb-supports-bulk-imports-amazon-s3-new-dynamodb-tables/

This blog post may also help: https://aws.amazon.com/blogs/database/amazon-dynamodb-can-now-import-amazon-s3-data-into-a-new-table/

I would personally try this first to see the performance I get from this simpler way to ingest large data. If your EMR task is required to process this data, before loading it then you can use the S3 bulk import as a baseline for performances.

If you get performances you want with S3 bulk import, you can also envision to use EMR for a processing of the data with results in S3 that you would then bulk import into DDB via S3 import.

Best,

Didier

profile pictureAWS
EXPERT
answered 5 months ago
profile picture
EXPERT
reviewed 5 months ago
  • Hi, is this bulk import capable of handling large documents means larger tha dynamo db limit, we are using spark because some docs are bigger hence we persist them to s3. Can you help how can we mitigate this?

    @Didier?

0

Hi, is this bulk import capable of handling large documents means larger tha dynamo db limit, we are using spark because some docs are bigger hence we persist them to s3. Can you help how can we mitigate this?

answered 5 months ago
0

Hi,

Can you please suggest appropriate way to handle this load in given time. I would like to get efficient and performance friendly solution. Would like to focus on getting performance through EMR.

answered 5 months ago
0
  1. Please check if your EMR cluster is utilized maximum resources to execute the spark application. You can configure MaximumResourceAllocation property available in EMR spark to leverage the available resources.

  2. If your EMR cluster requires more nodes based on your requirement, further go ahead and extend it or use** auto scaling** as needed.

  3. You can check at DynamoDB side and make sure it utilizes the full provisioned capacity(WCU). Also, consider you have EMR cluster and dynamoDB table in the same region. You can also leverage this document to further optimize the performance.

profile pictureAWS
SUPPORT ENGINEER
answered 5 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions