EMRFS write errors

0

Upgrading from EMR versions 6.11 to 6.12 (even tried 7.0.0), I'm seeing these errors on the same exact job with the same resources - has something changed with how EMRFS has been implemented? What is the root cause of this error?

: org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=185, partition=635) failed; but task commit succ
ess, data duplication may happen. reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>,[Ljava.lang.StackTraceElement;@7eb2163c,org.apache.spark.
SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
        at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found.  The part
may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Reque
st ID: SZ2QF0DHZKKF6D9V; S3 Extended Request ID: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=; Proxy: null), S3 Extended Request I
D: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)

        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5516)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5463)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3667)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:26)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:12)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:114)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:141)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:196)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.completeMultipartUpload(AmazonS3LiteClient.java:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy47.completeMultipartUpload(Unknown Source)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.completeUpload(DefaultMultipartUploadDispatcher.java:57)

        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.complete(DefaultMultipartUploadDispatcher.java:40)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.completeUpload(InMemoryStagingDirectory.java:227)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.publish(InMemoryStagingDirectory.java:109)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.publish(SynchronizedStagingDirectory.java:52)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.publishStagingDirectory(InMemoryStagingMetadataStore.java:96)
        at com.amazon.ws.emr.hadoop.fs.staging.DefaultStagingMechanism.publishStagingDirectory(DefaultStagingMechanism.java:76)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.publishStagingDirectory(FileSystemOptimizedCommitter.java:149)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.commitTask(FileSystemOptimizedCommitter.java:138)
        at com.amazon.emr.committer.FilterParquetOutputCommitter.commitTask(FilterParquetOutputCommitter.java:119)
        at com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter.commitTask(EmrOptimizedSparkSqlParquetOutputCommitter.java:9)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:233)
        at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:405)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
        ... 15 more
Dev
已提問 5 個月前檢視次數 757 次
1 個回答
3

Hello,

The above error occurs if CompleteMultipartUpload failed to complete the multipart upload by assembling previously uploaded parts. This error could have been caused because the object was probably not written (fully committed) and a new request is being made on the same object as per this doc..

In order to alleviate this issue, you can turn off this speculative uploads by setting fs.s3.multipart.th.fraction.parts.completed to a high valid value e.g. 0.99. This configuration will disable Speculative UploadParts in EMRFS, and also reattempts only that part that failed during the MultiPart upload as per the changed logic for large file upload to s3 possibly you did not observe in the previous version perhaps.

Also, I recommend to use a bigger value for fs.s3n.multipart.uploads.split.size and also increase the number of retries. This specifies the maximum size of a part, in bytes, before EMRFS starts a new part upload when multipart uploads is enabled.

Also, fs.s3.maxRetries: this specifies how many retries EMRFS will use in its exponential backoff strategy. Default value is 15. These settings can be specified in emrfs-site.xml

AWS
支援工程師
已回答 5 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南