AWS Glue ETL ジョブを実行したときに"No space left on device"というエラーが表示されないようにしたいです。
簡単な説明
AWS Glue の抽出、変換、ロード (ETL) ジョブを実行すると、次の種類のエラーが表示される場合があります。
- ステージ障害によりジョブが中止されました: ステージ158.0のタスク68は4回失敗しました。最新の失敗: ステージ 158.0 (TID 28820、10.102.100.111、エグゼキューター 17) でタスク 68.3 が失われました:org.apache.spark.memory.spark.Spark.Shuffle.Sort.shuffleExternalSorter @55f6dabb: デバイスに空き容量が残っていません
-または-
- ステージ障害によりジョブが中止されました: ResultStage 7 が許容最大回数失敗しました: 4.最新の失敗理由:org.apache.spark.shuffle.metadataFetchFailedException: シャッフル 2 の出力場所が見つかりません
Apache Spark は、AWS Glue ワーカー上のローカルディスクを使用して、spark.executor.memory 設定パラメータで定義されたヒープ領域を超えるデータをメモリからスピルします。
groupByKey()、reduceByKey()、および join() のような広範囲の変換によりシャッフルが生じる可能性があります。ジョブのソートまたはシャッフルのステージで、Spark は異なるワーカー間でデータを交換する前に、中間データをローカルディスクに書き込みます。この時点で“No space left on device“または“MetadataFetchFailedException“エラーが表示される可能性があります。Spark がこのエラーをスローするのは、エグゼキューターに十分なディスク容量が残っておらず、リカバリもできない場合です。
解決策
このような種類のエラーは通常、処理ジョブにおいてデータセットに大きなスキューがある場合に発生します。このセクションでは、一般的な監視とデバッグの異常と、それらに対処する方法を示します。
AWS Glue ジョブメトリクス と Apache Spark UI は、Apache Spark エグゼキューターのデータスキューを監視するための強力なツールです。このツールでは、実装のタイムラインを監視することで、データスキューの原因となり得る問題を簡単に特定できます。各ステージ、タスク、ジョブ、エグゼキューターの動作を詳細に理解するのに役立ちます。
非集約型のコンピューティングとストレージ
この方法では、データを AWS Glue ワーカーのローカルディスクに書き込む代わりに、大規模なシャッフル用としてストレージをスケールします。
専用のサーバーレスストレージを使用する: AWS Glue 2.0 以降では、Amazon Simple Storage Service (Amazon S3) を使用して Spark のシャッフルデータやスピルデータを保存できます。
AWS Glue 2.0 AWS Glue で Amazon S3 シャッフルを使用するには、以下のジョブパラメータを使用します。詳細については、"AWS Glue Spark シャッフルプラグインと Amazon S3"を参照してください。
- キー: --write-shuffle-files-to-s3
値: TRUE
- キー: --write-shuffle-spills-to-s3
値: TRUE
- キー: --conf
値: spark.shuffle.glue.s3ShuffleBucket=s3://custom_shuffle_bucket
注: オプションのフラグ spark.shuffle.glue.s3ShuffleBucket は、シャッフルファイルの書き込み先となる Amazon S3 バケットを指定します。custom_shuffle_bucket を S3 バケットの名前に置き換えます。
AWS Glue 3.0/4.0 Apache Spark 用のクラウドシャッフルストレージプラグインを使用するには、次のジョブパラメータを使用します。詳細については、"Cloud Shuffle Storage Plugin for Apache Spark"を参照してください。
- キー: --write-shuffle-files-to-s3
値: TRUE
- キー: --conf
値: spark.shuffle.storage.path=s3://custom_shuffle_bucket
注: オプションのフラグ spark.shuffle.storage.path は、シャッフルファイルの書き込み先となる Amazon S3 バケットを指定します。custom_shuffle_bucket を S3 バケットの名前に置き換えます。
スケールアウト
スケールアウトとは、水平スケールによってワーカーの数を増やすか、垂直スケールによってワーカータイプをアップグレードすることを指します。ただし、特にデータがいくつかのキーで大きく偏っている場合など、スケールアウトは常に機能するとは限りません。データスキューに対処するために、ソルティング技術を実装して Apache Spark アプリケーションロジックを変更することを検討してください。
入力データの削減とフィルタリング
広範囲にわたる操作中のデータシャッフルおよびネットワーク使用量を最小限に抑えるために、入力データを可能な限り事前にフィルタリングします。効果的なデータフィルタリングのためには、以下の手法を使用します。
小さいテーブルをブロードキャストする
Apache Spark でテーブルを結合すると、異なるワーカーのエグゼキューター間でのデータシャッフルや大量のデータの移動がトリガーされる可能性があります。これにより、システムのメモリが不足し、ワーカーのディスクにデータがスピルする可能性があります。ネットワークのオーバーヘッドを最小限に抑えるため、Spark ではブロードキャスト用の小さいテーブルの使用をサポートしています。これらのテーブルは、数十MB (メガバイト) 以内のサイズとなり、データの分割やシャッフルができません。Spark へのヒントを提供することで、ブロードキャストのためにより小さいテーブルを使用できます。詳細については、AWS ブログ"Optimize memory management in AWS Glue"の"Broadcast small tables"を参照してください。
AQE を使用する
Databricks のアダプティブクエリ実行 (AQE) は、Spark SQL での最適化技術です。これは、ランタイム統計を使用して、各ステージでのデータスキューと動的シャッフルパーティションを解決するために最も効率的なクエリ実装プランを選びます。AQE は AWS Glue 3.0/4.0 ではデフォルトで利用できます。
AQE は次の機能を実行します。
-
シャッフルパーティションの動的な合体
各クエリステージの後に、必要に応じてパーティションを自動的に合体させ、パーティション管理の問題を解決します。詳細については、Databricks のウェブサイトで"Dynamically coalescing shuffle partitions"("Adaptive Query Execution: Speeding Up Spark SQL at Runtime"内) を参照してください。
–パーティションが少なすぎる: ローカルディスクへのスピルの原因となる余分なデータを含むパーティションを自動的に分割します。
–パーティションが多すぎる: 必要に応じてパーティションを自動的に結合します。各パーティションのデータサイズが小さく、シャッフルブロックを読み取るのに小さいネットワークデータのフェッチが必要な場合、パーティションが結合されます。
-
結合戦略を動的に切り替える
各ステージのテーブルサイズ統計に基づいて、Sort-Merge を Broadcast Hash Join に変換します。
-
スキュー結合を動的に最適化する
シャッフルファイル統計からスキュー結合を自動的に検出して最適化します。AQE は、スキューのあるパーティションをより小さいサブパーティションに分割します。次に、これらの小さいパーティションを他のノードの対応するパーティションに結合します。詳細については、Databricks のウェブサイトで"Dynamically optimizing skew joins"("Adaptive Query Execution: Speeding Up Spark SQL at Runtime"内) を参照してください。
AQE を設定する
以下のジョブパラメータを使用して AQE を有効にします。
- キー: --conf
値: spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.sql.adaptive.skewJoin.enabled=true