EMRFS write errors

0

Upgrading from EMR versions 6.11 to 6.12 (even tried 7.0.0), I'm seeing these errors on the same exact job with the same resources - has something changed with how EMRFS has been implemented? What is the root cause of this error?

: org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=185, partition=635) failed; but task commit succ
ess, data duplication may happen. reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>,[Ljava.lang.StackTraceElement;@7eb2163c,org.apache.spark.
SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
        at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found.  The part
may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Reque
st ID: SZ2QF0DHZKKF6D9V; S3 Extended Request ID: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=; Proxy: null), S3 Extended Request I
D: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)

        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5516)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5463)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3667)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:26)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:12)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:114)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:141)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:196)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.completeMultipartUpload(AmazonS3LiteClient.java:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy47.completeMultipartUpload(Unknown Source)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.completeUpload(DefaultMultipartUploadDispatcher.java:57)

        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.complete(DefaultMultipartUploadDispatcher.java:40)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.completeUpload(InMemoryStagingDirectory.java:227)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.publish(InMemoryStagingDirectory.java:109)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.publish(SynchronizedStagingDirectory.java:52)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.publishStagingDirectory(InMemoryStagingMetadataStore.java:96)
        at com.amazon.ws.emr.hadoop.fs.staging.DefaultStagingMechanism.publishStagingDirectory(DefaultStagingMechanism.java:76)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.publishStagingDirectory(FileSystemOptimizedCommitter.java:149)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.commitTask(FileSystemOptimizedCommitter.java:138)
        at com.amazon.emr.committer.FilterParquetOutputCommitter.commitTask(FilterParquetOutputCommitter.java:119)
        at com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter.commitTask(EmrOptimizedSparkSqlParquetOutputCommitter.java:9)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:233)
        at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:405)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
        ... 15 more
Dev
已提问 5 个月前764 查看次数
1 回答
3

Hello,

The above error occurs if CompleteMultipartUpload failed to complete the multipart upload by assembling previously uploaded parts. This error could have been caused because the object was probably not written (fully committed) and a new request is being made on the same object as per this doc..

In order to alleviate this issue, you can turn off this speculative uploads by setting fs.s3.multipart.th.fraction.parts.completed to a high valid value e.g. 0.99. This configuration will disable Speculative UploadParts in EMRFS, and also reattempts only that part that failed during the MultiPart upload as per the changed logic for large file upload to s3 possibly you did not observe in the previous version perhaps.

Also, I recommend to use a bigger value for fs.s3n.multipart.uploads.split.size and also increase the number of retries. This specifies the maximum size of a part, in bytes, before EMRFS starts a new part upload when multipart uploads is enabled.

Also, fs.s3.maxRetries: this specifies how many retries EMRFS will use in its exponential backoff strategy. Default value is 15. These settings can be specified in emrfs-site.xml

AWS
支持工程师
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则