このトピックでは、Realtime Compute for Apache Flink デプロイメントのチェックポイントまたはセーブポイントに関するよくある質問への回答を提供します。
miniBatch が有効になっている場合、状態データの TTL の有効期限が切れた後に新しいデータが更新されません。なぜですか?
miniBatch が有効になっている場合、データはバッチで計算され、状態に保存されます。状態のデータは、以前の完全データ計算結果に基づいています。状態データの存続時間(TTL)の有効期限が切れたために状態がクリアされると、累積された以前の計算結果もクリアされます。その結果、バッチで計算されたデータに基づいて状態データを更新できません。
miniBatch が無効になっている場合、状態データの TTL の有効期限が切れると、有効期限が切れたキーのデータが累積的に再計算されます。したがって、状態データは常に更新できます。データ更新頻度の増加は、データ処理の遅延などの他の問題を引き起こします。
ビジネス要件に基づいて miniBatch と TTL を構成する必要があります。
次のチェックポイントの開始時刻を計算するにはどうすればよいですか?
現在の間隔と最小間隔は、次のチェックポイントの開始時刻に影響を与える可能性があります。ある時点で次の両方の条件が満たされると、次のチェックポイントがトリガーされます。
現在の間隔は、
<最後の開始時刻、次の開始時刻>間の最小時間差です。最小間隔は、
<最後の終了時刻、次の開始時刻>間の最小時間差です。
次のシナリオでは、チェックポイント試行間の最小間隔は 3 分、最小間隔は 3 分、タイムアウト期間は 10 分です。
シナリオ 1:デプロイメントは想定どおりに実行されます。各チェックポイントは成功します。
最初のチェックポイントは 12:00:00 にトリガーされ、12:00:02 に成功します。2 番目のチェックポイントは 12:03:00 にトリガーされます。
シナリオ 2:デプロイメントが異常です。(チェックポイントがタイムアウトになるか、特定の理由で失敗します。この例では、チェックポイントがタイムアウトになります。)
最初のチェックポイントは 12:00:00 にトリガーされ、12:00:02 に成功します。2 番目のチェックポイントは 12:03:00 にトリガーされますが、チェックポイントがタイムアウトになったため 12:13:00 に失敗します。3 番目のチェックポイントは 12:16:00 にトリガーされます。
チェックポイントの最小間隔の設定の詳細については、「チェックポイントの調整」をご参照ください。
VVR 8.X の GeminiStateBackend と VVR 6.X の GeminiStateBackend の違いは何ですか?
デフォルトでは、Realtime Compute for Apache Flink の Ververica Runtime(VVR)6.X では GeminiStateBackend V3 が使用され、Realtime Compute for Apache Flink の VVR 8.X では GeminiStateBackend V4 が使用されます。
項目 | 説明 |
基本機能 |
|
状態の遅延読み込み構成 |
|
マネージドメモリ | 2 つのバージョンは、マネージドメモリに関して常駐セットサイズ(RSS)メトリックのみが異なります。
説明 マネージドメモリの詳細については、「TaskManager メモリの設定」をご参照ください。 |
増分チェックポイントのサイズが完全チェックポイントのサイズと同じ場合はどうすればよいですか?
Realtime Compute for Apache Flink を使用しているときに増分チェックポイントのサイズが完全チェックポイントのサイズと同じ場合は、次の操作を実行して問題が発生しているかどうかを確認します。
増分セーブポイントが正しく構成され、有効になっているかどうかを確認します。
デプロイメントが特別なシナリオで実行されているかどうかを確認します。特別なシナリオでは、増分チェックポイントのサイズが完全チェックポイントのサイズと同じになることが想定されます。例:
18:29 にデプロイメントにデータが挿入される前は、デプロイメントはデータを処理しません。この場合、チェックポイントにはデータソースの初期化された状態データのみが含まれます。状態データが存在しないため、チェックポイントは完全チェックポイントです。
合計 1,000,000 件のデータレコードが 18:29 にデプロイメントに挿入されます。データレコードが現在のチェックポイント間隔内で完全に処理され、その間隔中に他のデータがデプロイメントに挿入されない場合、その間隔中に生成される最初の増分チェックポイントには、1,000,000 件のデータレコードのすべての状態データが含まれます。チェックポイント間隔は 3 分です。
この場合、増分チェックポイントのサイズは完全チェックポイントのサイズと同じです。最初の増分チェックポイントには、増分チェックポイントから状態全体を復元できるように、完全データの状態が含まれている必要があります。したがって、最初の増分チェックポイントは実際には完全チェックポイントです。
ほとんどの場合、データ入力が安定していて、大規模な状態変更が発生しない場合、2 番目以降の増分チェックポイントのサイズは完全チェックポイントのサイズとは異なります。これは、システムが想定どおりに状態データの増分データのセーブポイントのみを作成していることを示しています。2 番目以降のチェックポイントのサイズが完全チェックポイントのサイズと同じ場合は、システムの状態と動作を確認して、システムの問題が発生しているかどうかを判断します。
Python デプロイメントのチェックポイントの作成速度が遅い場合はどうすればよいですか?
原因
Python オペレーターにはキャッシュが含まれています。システムがチェックポイントを作成するとき、システムはキャッシュ内のすべてのデータを処理する必要があります。 Python ユーザー定義関数(UDF)のパフォーマンスが低い場合、チェックポイントの作成に必要な時間が長くなります。これは Python デプロイメントの実行に影響します。
解決策
キャッシュできるデータ量を減らします。次の構成を [その他の構成] タブの [パラメーター] セクションにある [その他の構成] フィールドに追加できます。詳細については、「デプロイメント実行のカスタムパラメーターを構成するにはどうすればよいですか?」をご参照ください。
python.fn-execution.bundle.size: バンドルに含めることができる要素の最大数。デフォルト値:100000。 python.fn-execution.bundle.time: デフォルト値:1000。単位:ミリ秒。パラメーターの詳細については、「Flink Python 構成」をご参照ください。
デプロイメントのチェックポイント中にエラーが発生した場合はどうすればよいですか?どうすればよいですか?
エラータイプの特定
[アラーム] タブまたは [状態] タブでチェックポイントの履歴情報を表示して、チェックポイントのタイムアウトや書き込みエラーなどのエラータイプを特定できます。

さまざまなタイプのエラーのトラブルシューティング
シナリオ 1:チェックポイントのタイムアウトが頻繁に発生する場合は、デプロイメントにバックプレッシャーがあるかどうかを確認する必要があります。デプロイメントにバックプレッシャーがある場合は、バックプレッシャーの根本原因を分析し、低速のオペレーターを見つけ、関連するリソースまたは構成を調整します。バックプレッシャーの問題のトラブルシューティング方法の詳細については、「バックプレッシャーの問題をトラブルシューティングするにはどうすればよいですか?」をご参照ください。
シナリオ 2:チェックポイント中に書き込みエラーが発生した場合は、次の手順を実行して TaskManager ログを表示し、ログに基づいてエラーをトラブルシューティングする必要があります。
デプロイメントの [ログ] タブの [チェックポイント] サブタブで、[チェックポイント履歴] をクリックします。

異常なチェックポイントの左側のプラス記号(+)をクリックして、異常なチェックポイントに関連するオペレーターの詳細を表示します。
異常なオペレーターの左側のプラス記号(+)をクリックし、異常なサブタスクの [ID] 列の値をクリックして、関連する TaskManager に移動します。

エラーメッセージ「You are using the new V4 state engine to restore old state data from a checkpoint」が表示された場合はどうすればよいですか?エラーメッセージ「古いチェックポイントのステートデータの復元に新しい V4 ステートエンジンを使用しています」が表示された場合はどうすればよいですか?
エラーの詳細
VVR バージョンを 6.X から 8.X にアップグレードすると、エラーメッセージ「
You are using the new V4 state engine to restore old state data from a checkpoint」が表示されます。原因
VVR 6.X で使用される GeminiStateBackend バージョンは、VVR 8.X で使用される GeminiStateBackend バージョンと一致しません。したがって、2 つのバージョンのチェックポイントは互換性がありません。
解決策
次のいずれかの方法を使用して問題を解決します。
デプロイメントの標準形式でセーブポイントを作成し、セーブポイントの状態からデプロイメントを開始します。詳細については、「セーブポイントを手動で作成する」および「デプロイメントを開始する」をご参照ください。
状態なしでデプロイメントを再起動します。
(推奨しません)GeminiStateBackend V3 を使用します。この場合、
state.backend.gemini.engine.type: STREAMINGを構成し、デプロイメントを再起動する必要があります。パラメーターの構成方法の詳細については、「デプロイメント実行のパラメーターを構成するにはどうすればよいですか?」をご参照ください。(推奨しません)VVR 6.X エンジンを使用してデプロイメントを開始します。
エラーメッセージ「java.lang.NegativeArraySizeException」が表示された場合はどうすればよいですか?
エラーの詳細
デプロイメントがリスト状態を使用している場合、デプロイメントの実行中に次の例外が発生します。
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)原因
リスト状態の単一キーの状態データのサイズが 2 GB を超えています。大量の状態データは、次のプロセスで生成されます。
デプロイメントが想定どおりに実行されると、リスト状態のキーの値に追加された値は、マージプロセスで結合されます。たとえば、ウィンドウオペレーターのリスト状態を使用するデプロイメントでは、状態データが継続的に累積されます。
状態データのサイズが特定のしきい値まで累積されると、メモリ不足(OOM)エラーがトリガーされます。デプロイメントが障害から回復した後、リスト状態のマージプロセスにより、状態バックエンドによって要求される一時バイト配列のサイズが制限を超えます。その結果、この例外が発生します。
説明RocksDBStateBackend を使用している場合にも、この問題が発生し、「ArrayIndexOutOfBoundsException」または「Segmentation fault」というエラーメッセージが表示されることがあります。詳細については、「EmbeddedRocksDBStateBackend」をご参照ください。
解決策
ウィンドウオペレーターが大量の状態データを生成する場合は、ウィンドウサイズを小さくすることをお勧めします。
デプロイメントロジックが過度に大量の状態データを生成する場合は、デプロイメントロジックを変更することをお勧めします。たとえば、キーを分割できます。
エラーメッセージ「org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.」が表示された場合はどうすればよいですか?
エラーの詳細
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints原因
このエラーは、Kafka がシンクとして使用されている場合に複数の連続したチェックポイントエラーによって発生します。
解決策
execution.checkpointing.timeoutパラメーターを設定してチェックポイントのタイムアウト期間を変更し、タイムアウトが原因でチェックポイントが失敗しないようにします。パラメーターの構成方法の詳細については、「デプロイメント実行のカスタムパラメーターを構成するにはどうすればよいですか?」をご参照ください。
エラーメッセージ「Exceeded checkpoint tolerable failure threshold」が表示された場合はどうすればよいですか?
エラーの詳細
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)原因
許容できるチェックポイントエラーの数が小さい値に設定されています。チェックポイントエラーの数が制限を超えると、デプロイメントはフェールオーバーをトリガーします。許容できるチェックポイントエラーの数は指定されていません。デフォルトでは、チェックポイントエラーは許容されません。
解決策
execution.checkpointing.tolerable-failed-checkpoints: numパラメーターを指定して、デプロイメントで許容できるチェックポイントエラーの数を調整します。このパラメーターは 0 または正の整数に設定する必要があります。パラメーターが 0 に設定されている場合、チェックポイントの例外またはエラーは許容されません。パラメーターの構成方法の詳細については、「デプロイメント実行のカスタムパラメーターを構成するにはどうすればよいですか?」をご参照ください。