Amazon EMR での Apache Spark ジョブで発生する、「no space left on device」というステージエラーの解決方法を教えてください。
Apache Spark アプリケーションを Amazon EMR クラスターに送信すると、そのアプリケーションを実行できず、「no space left on device」というステージエラーが発生します。
簡単な説明
次のいずれかの要因で、Apache Spark アプリケーションで no space left on device エラーが発生することがあります。
- シャッフル結合が存在するため、シャッフル処理中に大量の中間データが生成される
- 不均等データパーティションの分散とエグゼキューターのワークロード分散
- パーティションのサイズと数が適切に設定されていない
- 使用できるディスクやメモリなどのリソースが不足している
Apache Spark は、コアノードとタスクノードのローカルストレージを使用して中間 (シャッフル) データを保存します。インスタンスのディスク容量が不足すると、ジョブを実行できず、no space left on device エラーが発生します。
解決策
この記事では、「no space left on device」エラーの最も一般的な原因および、解決方法について説明します。適切な修正を実施するには、根本原因を特定する必要があります。
注: AWS コマンドラインインターフェイス (AWS CLI) コマンドの実行中にエラーが発生した場合は、「AWS CLI で発生したエラーのトラブルシューティング」を参照してください。また、AWS CLI の最新バージョンを使用していることを確認してください。
再パーティショニング
クラスター内のコアノードとタスクノードの数によっては、Spark パーティションの数を増やす必要がある場合があります。Spark パーティションをさらに追加するには、次のコマンドを実行します。
val numPartitions = 500 val newDF = df.repartition(numPartitions)
注: 500 は、ユースケースに合ったパーティションの数に置き換えてください。
Spark 構成のチューニング
パーティション管理
シャッフル中にディスクへのスピルが過剰に発生したり、パーティション間のデータ分散が不均一になったりした場合は、次のパラメータを調整します。
spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files spark.sql.files.maxRecordsPerFile=10000000
次のいずれかに当てはまる場合は、並列処理とパーティション数を増やします。
- タスク期間の差異が平均期間の 3 倍を超えている
- タスクあたりのスピル量が 20% を超えている
平均パーティションサイズが 50 MB 未満の場合、または大量の小規模ファイルがある場合は、並列処理とパーティション数を減らします。
最適なパーティション数を計算するには、次の式を使用します。
Initial Partitions = Number of Cores * 2 Optimal Partitions = max( Total Input Size / Target Partition Size, Initial Partitions )
データ量に基づくチューニング
複数のサイズのデータセット用の構成パラメータを次に示します。
小規模データセット (100 GB 未満)
spark.sql.files.maxPartitionBytes=128MB spark.sql.shuffle.partitions=NUM_CORES * 2 spark.sql.files.maxRecordsPerFile=5000000
中規模データセット (100 GB ~ 1 TB)
spark.sql.files.maxPartitionBytes=256MB spark.sql.shuffle.partitions=NUM_CORES * 3 spark.sql.files.maxRecordsPerFile=10000000
大規模データセット (1 TB 超)
spark.sql.files.maxPartitionBytes=512MB spark.sql.shuffle.partitions=NUM_CORES * 4 spark.sql.files.maxRecordsPerFile=20000000
メモリとストレージの最適化
メモリとストレージを最適化するために、設定パラメータを更新します。
spark.memory.fraction=0.8 # Higher for compute-intensive jobs spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads spark.executor.memoryOverhead=0.2 # 20% of executor memory spark.memory.offHeap.enabled=true spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}
Spark エグゼキューターコンテナの合計メモリ割り当てを計算するために、次の 4 つのメモリコンポーネントを組み合わせます。
- エグゼキューターのメモリ (spark.executor.memory)
- メモリオーバーヘッド (spark.executor.memoryOverhead)
- オプヒープメモリ (spark.memory.offHeap.size)
- PySpark メモリ (spark.executor.pyspark.memory)
エグゼキューターコンテナの合計メモリ = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory
次の計算を行い、Spark の内部メモリ割り当てを判断します。
Storage Memory = Executor Memory * memory.fraction * memory.storageFraction Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction) Off-Heap Memory = Executor Memory * 0.2
ファイルおよびディスクの管理
パーティションスキューが発生したり、パーティションの数が多すぎたり少なすぎたりする場合は、ファイル管理設定を調整してください。ユースケースで別の値が必要な場合を除き、maxPartitionNum を合計コア数の 2 倍に、minPartitionNum を 1 に設定します。
# File Management spark.sql.files.minPartitionNum=${NUM_CORES} spark.sql.files.maxPartitionNum=${NUM_CORES * 4} spark.shuffle.file.buffer=64k
maxPartitionNum の値が小さすぎる場合、並列処理が制限され、すべてのスキューシナリオを防止できなくなる可能性があります。
AQE とスキューの処理
Spark の AQE (Adaptive Query Execution) は、リアルタイムの統計情報に基づきクエリプランを調整し、実行時間を最適化します。
AQE は、Amazon EMR バージョン 5.30.0 以降ではデフォルトで有効になっています。Spark の AQE では、結合戦略とシャッフルを自動的に最適化できます。さらに、動的なパーティション分割により、データスキューを効果的に処理し、負荷分散とクエリのパフォーマンスを向上させます。
# Skew Management spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.skewedPartitionFactor=10 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
AQE をサポートしていない以前のバージョンの Spark を使用している場合は、次のいずれかの方法でデータスキューに対処します。
- ブロードキャスト結合で spark.sql.autoBroadcastJoinThreshold のしきい値を調整します。この方法は、結合時に一方のデータセットが他方のデータセットよりも大幅に小さい場合に役立ちます。
- コード内で repartition() または coalesce() を使用すると、データ分散を改善できます。
- 大規模なテーブルやスキューのあるテーブルに SKEW ヒントを適用し、小規模なテーブルをブロードキャストします。SKEW ヒントは、テーブルにスキューデータがあることを Spark オプティマイザーに通知し、結合戦略を最適化します。
Spark SQL クエリにおける SKEW ヒントの例を次に示します。
-- Using SKEW hint in Spark SQL SELECT /*+ SKEW('t1') */ t1.key, t2.value FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key -- Using MAPJOIN hint along with SKEW SELECT /*+ SKEW('t1'), MAPJOIN(t2) */ t1.key, t2.value FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key
ブートストラップアクションを使用してストレージを動的にスケールアップする
Amazon CloudWatch モニタリングと Lambda オートメーションでブートストラップアクションを使用すると、Amazon EMR クラスターのストレージを自動スケーリングできます。使用可能なディスク容量が設定したしきい値を下回ると、Amazon CloudWatch は AWS Lambda 関数を起動します。この関数は、新しい Amazon Elastic Block Store (Amazon EBS) ボリュームをクラスターノードにアタッチします。次に、この関数はスクリプトを実行してボリュームをフォーマットおよびマウントし、Hadoop 分散ファイルシステム (HDFS) に統合します。
この自動化されたアプローチでは、ストレージの制約によるクラスター障害を防止できます。また、必要な場合にのみ容量を追加することで、費用対効果を維持できます。実装には、適切な Identity and Access Management (IAM)ロール、Amazon CloudWatch アラーム、AWS Lambda 設定、およびボリューム管理用のカスタムスクリプトが必要です。詳細については、「Amazon EMR クラスターでストレージを動的にスケールアップする」を参照してください。
Amazon EBS キャパシティを追加する
新しいクラスターに大容量の EBS ボリュームを使用する
Amazon EMR クラスターを起動し、大容量の EBS ボリュームを持つ Amazon Elastic Compute Cloud (Amazon EC2) インスタンスタイプを選択します。詳細については、「インスタンスのデフォルト Amazon EBS ストレージ」を参照してください。
実行中のクラスターに EBS ボリュームをさらに追加する
次の手順を実行します。
-
大容量の EBS ボリュームを使用しても問題が解決しない場合は、コアノードとタスクノードに追加の EBS ボリュームをアタッチします。
-
アタッチしたボリュームをフォーマットし、マウントします。正しいディスク番号を使用していることを確認してください。たとえば、/data ではなく、/mnt1 または /mnt2 を使用します。
-
/mnt2/yarn ディレクトリを作成し、そのディレクトリの所有権を YARN ユーザーに設定します。
sudo mkdir /mnt2/yarn sudo chown yarn:yarn /mnt2/yarn -
/mnt2/yarn ディレクトリを /etc/hadoop/conf/yarn-site.xml の yarn.nodemanager.local-dirs プロパティに追加します。
例<property> <name>yarn.nodemanager.local-dirs</name> <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value> </property> -
NodeManager サービスを再起動します。
Amazon EMR 4.x-5.29.0 リリースバージョンsudo stop hadoop-yarn-nodemanager sudo start hadoop-yarn-nodemanagerAmazon EMR 5.30.0 以降のリリースバージョン
sudo systemctl stop hadoop-yarn-nodemanager sudo systemctl start hadoop-yarn-nodemanager
関連情報
Amazon EMR の Spark ジョブにおけるステージエラーの問題を解決する方法を教えてください
ベストプラクティス (AWS Open Data Analytics のウェブサイト)
関連するコンテンツ
- 質問済み 8年前
