Amazon Kinesis クライアントライブラリ (KCL) アプリケーションがスタックしていて、Amazon Kinesis Data Streams レコードを処理できません。
簡単な説明
KCL アプリケーションは次の理由でスタックしたりブロックされたりすることがあります。
- レコードプロセッサ (ユーザーが実装したメソッド) が、ブロック操作を実行している、または通常より時間がかかっている。
- シャードに配置されたデータレコードがない。
- レコードの取得中に KCL がスタック状態になる。
- KCL が処理をスケジュールできない、またはチェックポイントに失敗する。
KCL の問題を検出してトラブルシューティングするには、次のタスクを実行します。
- KCL メトリクスを分析します。
- KCL アプリケーションの Amazon DynamoDB テーブルを分析します。
- KCL 設定を確認してください。
- KCL 警告ログを有効にします。
- KCL デバッグログを有効にします。
解決策
KCL メトリクスを分析する
RecordProcessor.processRecords.Time メトリクスを監視します。レコードプロセッサの processRecords メソッドにかかった時間が 60 秒未満であることを確認します。processRecords メソッドがブロックされている場合、KCL は待機しなくてはなりません。レコードプロセッサがジョブを完了したら、processRecords メソッドを最適化します。
KCL アプリケーションの DynamoDB テーブルを分析する
すべての KCL アプリケーションは、KCL アプリケーションと同じ名前の DynamoDB テーブルを作成して、アプリケーションの状態を追跡します。KCL アプリケーションのトラブルシューティングを行うには、DynamoDB テーブルの列を分析します。
テーブル内のチェックポイント列が更新されない場合、processRecords メソッドロジックはスタックします。チェックポイント列と leaseCounter 列が両方とも更新されていない場合、maxLeasesPerWorker=1 パラメータにより、他のワーカーはリースを取れなくなります。processRecords メソッドのブロックを解除するには、パラメータ値を大きくします。
KCL 設定を確認する
KCL フリートの数を確認します。Kinesis Data Streams でのシャードの数を書き留めます。シャードの数が増加した場合は、KCL 内のシャードの数に応じて maxLeaseSperWorker パラメーターを増やしてください。
高度な KCL 警告ログを有効にする
レコードプロセッサがブロックされていることを確認するには、KCL 設定の logWarningForTaskAfterMillis 値をミリ秒に設定します。その後、KCL はレコードプロセッサが完了するのを待ってから、処理時間に関する警告メッセージをログに出力します。警告メッセージがログに記録されたら、JVM から連続的なスタックダンプをキャプチャして、何がブロックされているかを調べます。jstack コマンドを使用すると、スタックトレースをキャプチャできます。logWarningForTaskAfterMillis 値の詳細については、GitHub ウェブサイトの LifecycleConfig.java を参照してください。
KCL デバッグログを有効にする
KCL デバッグログを有効にすると、KCL が Kinesis Data Streams からのデータ消費を停止する原因となった問題を特定できます。また、KCL アプリケーションを再起動して、アプリケーションの他の問題をすべてクリアすることもベストプラクティスです。
KCL を再起動しても KCL がスタックしている場合は、シャードの所有権の移転によって問題が生じている可能性があります。シャードの所有権の移転による問題があることで、再現しようとしているデータのログが KCL にないという問題も生じる可能性があります。この問題を解決するには、KCL フリートのログ記録機能を有効にしてください。
ログを有効にするには、次の手順を実行します。
-
ロガーを選択します。
-
src/main/resources フォルダーに log4.properties ファイルを作成して、ログメッセージをコンソールにリダイレクトします。
log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.logger.httpclient.wire=DEBUG
注: この例では、log4j を使用して Java でログをデバッグします。
-
ログメッセージをログファイルにリダイレクトします。
log4j.appender.file=org.apache.log4j.RollingFileAppenderlog4j.appender.file.File=/Users/harshdev/Desktop/logfolder/ <== Give the log location where you want to create log files
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.rootLogger=DEBUG, stdout, file
-
POM ファイルに log4j 依存関係を含めます。
<dependency> <groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
関連情報
Monitoring the Kinesis Client Library with Amazon CloudWatch
Resharding, scaling, and parallel processing