Help us improve the AWS re:Post Knowledge Center by sharing your feedback in a brief survey. Your input can influence how we create and update our content to better support your AWS journey.
Amazon MWAA 環境で Queued 状態に留まっているタスクをトラブルシューティングする方法を教えてください。
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) でワークフローを実行していますが、タスクが Queued 状態から変化せず、Running 状態に移行しません。
簡単な説明
次の要因で、Amazon MWAA のタスクが Queued 状態から移行できない場合があります。
- 環境が同時実行タスクの最大数に達した。
- Airflow 構成オプションが MWAA 環境で正しく設定されていない。
- ワーカー上のタスク用のメモリまたは CPU が不足している。
タスクを実行する通常のワークフローに障害が発生すると、タスクは Queued 状態に留まります。Apache Airflow ワーカーに過負荷がかかり、指定された時間内に応答しきれなくなる場合があります。この場合、12 時間後にデフォルトの可視性タイムアウトに達するまで、タスクは Amazon Simple Queue Service (Amazon SQS) キューに残ります。再試行を設定した場合は、Apache Airflow スケジューラーはタスクを再試行します。
解決策
トラブルシューティングを行う前に、環境リソースの負荷が最大に達しているのか、それともワーカーに関連する問題が発生しているのかを判断してください。Amazon CloudWatch を使用し、環境のワーカーログおよび、メトリクス CPUUtilization と MemoryUtilization を確認します。
環境が同時実行タスクの最大数に達したかどうかを確認する
Amazon MWAA プールに空きがなくなり、環境がキューにさらにタスクを追加した場合、環境は同時タスクの最大数に達します。この問題を解決するには、環境のワーカー数を増やすか、環境のクラスサイズを変更します。
環境のワーカー数を増やす必要があるかどうかを判断するには、次の手順を実行します。
- CloudWatch コンソールを開きます。
- ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
- [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
- [AWS 名前空間] セクションで [MWAA < キュー] を選択します。
- QueuedTasks および RunningTasks を選択します。
- グラフでアクティビティが最も多い期間を特定し、両方のメトリクスの合計数を加算します。
注: 合計は、この期間のタスクの合計数を示します。 - 環境のデフォルト同時実行レベルを判断します。
注: 例えば、mw1.small 環境には、ワーカーごとに 5 つの同時タスクがあります。 - タスクの合計数を、同時実行タスクのデフォルトレベルで除算します。
- その数から、環境に設定した最大ワーカー数を減算します。
注: 結果が 0 より大きい場合は、ワーカーを追加し、現在の同時タスク数を満たす必要があります。
環境のワーカー数を増やしたり、環境のクラスサイズを変更したりするには、次の手順を実行します。
- Amazon MWAA コンソールを開きます。
- 該当する環境で [編集] を選択し、[次へ] を選択します。
- [環境クラス] セクションで次の操作を行います。
ステップ 9 で決定した最大ワーカー数を増やします。
さらに、最小ワーカー数を、アクティビティが最も少ない期間にワークロードが必要とする値に設定します。
注: 環境に追加できるワーカーには最大 25 個の制限があります。25 よりも多くワーカーが必要な場合は、[環境クラス] で大容量サイズを選択します。 - 環境のクラスサイズを増やす場合は、ワークロードに必要な最大および最小ワーカー数も設定してください。
ワーカー数を最適化してもワークロードには十分でない場合は、次の手順を実行します。
- Apache Airflow センサーの代わりに遅延可能なオペレーターを使用します。詳細については、Apache Airflow のウェブサイトで「遅延可能なオペレーターおよびトリガー」を参照してください。
- 実行開始時間をずらし、有向非巡回グラフ (DAG) の schedule_interval の間隔を小さくします。DAG をブロック単位でスケジュールします。
- 特定の外部関数を呼び出して監視するカスタムコードを使用する場合は、タスクを 2 つのタスクに分割します。呼び出し用に 1 つのタスクを作成し、もう一方を関数を監視するための遅延可能なオペレーターとして作成します。
Airflow 構成オプションに設定ミスがないか確認する
Airflow 構成オプションを確認するには、次の手順を実行します。
- MWAA コンソールを開きます。
- [環境] を選択し、該当する MWAA 環境を選択します。
- [Airflow 構成オプション] で core.parallelism および celery.worker_autoscale を確認します。
core.parallelism が設定されている場合は、手動で設定した core.parallelism オプションをすべて削除し、Amazon MWAA が動的に設定を行えるようにします。Amazon MWAA は、(maxWorkers * maxCeleryWorkers) / スケジューラー数 * 1.5 という式を使用して動的なデフォルト設定を計算します。Auto Scaling を使用して手動で値を設定した場合、最大負荷時に使用率が低くなり、問題が発生する可能性があります。
構成オプション celery.worker_autoscale の値をデフォルトの同時実行レベルと比較します。celery.worker_autoscale 設定オプションを変更していない場合は、デフォルトの同時実行レベルに、環境に設定した最大ワーカー数を乗算します。
**celery.worker_autoscale ** 値が意図せずデフォルト値より低くなっている場合は、CloudWatch メトリクスを使用してワーカーの CPU とメモリの使用状況を監視します。最大負荷時のリソース値が 20 ~ 60% の場合は、celery.worker_autoscale の値を増加させます。ワーカーコンテナの過剰使用を避けるために、少しずつ増やしてください。
celery.worker_autoscale 値が未設定の場合、またはデフォルト値をそのまま使用している場合は、ワーカーの CPU とメモリの使用状況を監視します。環境でのメトリクス値が高すぎる場合は、celery.worker_autoscale の値を下げます。最大負荷時の環境が 20 ~ 60% の場合は、最大値を増やすことができます。
過剰使用が原因でワーカーに障害が発生していないか確認する
MWAA ワーカーコンテナのすべての Celery ワーカーにタスクがあり、負荷が最大になると、ワーカーの過剰使用が原因で障害が発生する可能性があります。
MWAA Worker コンテナの Celery ワーカーは、現在使用されていないタスクをポーリングします。実行中のタスクとそれを定義するコードの複雑さによっては、ワーカーの過剰使用が原因でクラッシュする可能性があります。この問題は、MWAA ワーカーコンテナ上のすべての Celery ワーカーにタスクがあり、負荷が最大に達している場合に発生します。
ワーカーが過剰使用されており、機能不全に陥っていないかどうかを判断するには、次の手順を実行します。
- CloudWatch コンソールを開きます。
- ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
- [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
- [AWS 名前空間] セクションで [MWAA < Queue] を選択し、ApproximateAgeOfOldestTask を選択します。
- 時間範囲を拡大し、期間には 4 ~ 6 週間を含めます。
注: ピークが 40,000 秒以上の場合は、そのタスクが Amazon SQS キューに滞留しており、ワーカーでは過剰使用による障害が発生していることを示しています。また、システムによる強制終了が原因で、Celery ワーカーは障害をイベントバッファに書き込むことができません。
CloudWatch Insights を使用し、タスクが Amazon SQS キューに滞留しているときにアラートを出すこともできます。
アラートを作成するには、次の手順を実行します。
-
CloudWatch コンソールを開きます。
-
ナビゲーションペインで [ログ] を選択し、[Logs Insights] を選択します。
-
期間を 4 ~ 6 週間の範囲で指定します。
-
[選択基準] メニューで MWAA 環境のスケジューラーロググループを選択します。
-
クエリセクションに次のクエリを入力します。
fields _@timestamp_, _@message_, _@logStream_, _@log_ | filter _@message_ like /Was the task terminated externally?/ | sort _@timestamp_ desc | limit 10000以前にキューに入ったタスクを受信した際に、スケジューラーが送信するログの例を次に示します。
[[34m**2024-01-17T11:30:18.936+0000**[0m] [34mscheduler_job_runner.py:[0m771 ERROR[0m - Executor reports task instance <TaskInstance: dag_name.task_name manual__202X-XX-XXTXX:XX:XX.758774+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task terminated externally?[0m
コンピューティングやメモリを大量に消費するワークロードを軽減する
注: 次のリストを綿密に考慮してください。すべての要因がすべてのユースケースに当てはまるわけではありません。追加のサポートが必要な場合は、AWS サポートにお問い合わせください。
環境内のコンピューティングまたはメモリを大量に消費するワークロードを減らすには、次の手順を実行します。
- DAG コードに、抽出、変換、ロード (ETL) スクリプト、データ移動の指示、AI または ML パイプライン、その他のコンピューティングまたはメモリを大量に消費するワークロードが含まれていないことを確認してください。
- DAG コードを記述する際、Apache Airflow の推奨事項に従ってください。最上位のコードを最小化し、必要なものだけをインポートするようにしてください。詳細については、Apache Airflow のウェブサイトで「ベストプラクティス」を参照してください。
- DAG コードを最適化します。センサー、フック、カスタムオペレーター、拡張オペレーター、継承オペレーターのメモリフットプリントを分析し、潜在的な問題領域を特定します。
リソースの過剰使用が解消されない場合は、次の手順を実行します。
- celery.worker_autoscale の値をデフォルト値設定よりも低くします。celery.worker_autoscale の値を数桁減らし、24 ~ 48 時間環境を監視します。最適なレベルに達するまで celery.worker_autoscale の値を継続的に減らします。
注: celery.worker_autoscale の値を小さくすると、タスクプール全体が減少するため、長時間 Queue 状態に留まる項目数が増えます。これに対処するには、最低ワーカー数も増やす必要があります。 - さらに、「環境が同時実行タスクの最大数に達したかどうかを確認する」セクションの手順を再度実行し、ワーカーあたりの同時タスク数を減らします。
関連情報
Amazon MWAA での Apache Airflow のパフォーマンスを調整する
設定リファレンス (Apache Airflow のウェブサイト)
- 言語
- 日本語

関連するコンテンツ
- 質問済み 21日前