このトピックでは、Realtime Compute for Apache Flink におけるシステムチェックポイントとジョブスナップショットに関するよくある質問 (FAQ) に回答します。
ミニバッチが有効な場合、table.exec.state.ttl の期限が切れた後に新しいデータが更新されないのはなぜですか?
ミニバッチが有効な場合、データはバッチで計算され、状態に保存されます。状態内のデータは、以前の完全な計算結果に基づいています。存続時間 (TTL) の期限切れにより状態が消去されると、以前に蓄積された計算結果も失われます。その結果、ミニバッチの結果に基づいてデータを更新できなくなります。
逆に、ミニバッチが無効な場合、TTL により状態が期限切れになると、期限切れのキーのデータが再蓄積されて出力されます。これにより、データの更新が継続されます。ただし、データ更新の頻度が増加すると、データ処理の遅延など、他の問題が発生する可能性があります。
したがって、ビジネスシナリオに基づいてミニバッチと TTL を構成する必要があります。
次の定期的なチェックポイントの開始時刻の計算方法
次のチェックポイントの開始時刻に影響を与えるパラメーターは、チェックポイント間隔とチェックポイント間の最小休止時間の 2 つです。次のチェックポイントは、以下の両方の条件が満たされたときにトリガーされます。
チェックポイント間隔:前のチェックポイントの開始時刻と次のチェックポイントの開始時刻の間の最小時間差。
最小休止時間:前のチェックポイントの終了時刻と次のチェックポイントの開始時刻の間の最小時間差。
以下の 2 つのシナリオでこれを説明します。両方のシナリオで、チェックポイント間隔は 3 分、最小休止時間は 3 分、タイムアウトは 10 分です。
シナリオ 1:ジョブは正常に実行され、各チェックポイントは成功します。
最初のチェックポイントは 12:00:00 に開始し、12:00:02 に成功します。2 番目のチェックポイントは 12:03:00 に開始します。
シナリオ 2:ジョブが異常に実行されます。たとえば、チェックポイントがタイムアウトするか、失敗します。
最初のチェックポイントは 12:00:00 に開始し、12:00:02 に成功します。2 番目のチェックポイントは 12:03:00 に開始しますが、タイムアウトして 12:13:00 に失敗します。3 番目のチェックポイントは 12:16:00 に開始します。
チェックポイント間の最小休止時間の設定に関する詳細については、「Tuning Checkpointing」をご参照ください。
VVR 8.x と VVR 6.x で使用される GeminiStateBackend の違い
Realtime Compute for Apache Flink のコンピュートエンジンである Ververica Runtime (VVR) 6.x は、デフォルトで V3 バージョンの GeminiStateBackend を使用します。VVR 8.x は、デフォルトで V4 バージョンの GeminiStateBackend を使用します。
カテゴリ | 詳細 |
基本機能 |
|
状態の遅延読み込みパラメーター |
|
マネージドメモリ使用量の違い | 唯一の違いは Resident Set Size (RSS) メトリックにあります。
説明 マネージドメモリの詳細については、「TaskManager Memory」をご参照ください。 |
完全チェックポイントのサイズが増分チェックポイントと同じになるのは正常ですか?
Flink を使用しているときに、完全チェックポイントのサイズが増分チェックポイントと同じであることに気付いた場合は、次の手順を実行します。
増分スナップショットが正しく構成され、有効になっているか確認します。
この動作は、特定の状況では正常な場合があります。例:
データインジェスト前 (18:29 より前)、ジョブはデータを処理していません。チェックポイントには、ソースの初期状態情報のみが含まれます。他の状態データがないため、このチェックポイントは完全チェックポイントです。
18:29 に、100 万件のデータエントリが取り込まれます。データが次のチェックポイント間隔 (3 分) 内に完全に処理され、この期間中に他のデータが取り込まれないと仮定します。最初の増分チェックポイントには、これら 100 万件のデータエントリによって生成されたすべての状態情報が含まれます。
この場合、完全チェックポイントと増分チェックポイントのサイズが同じになるのは正常です。これは、最初の増分チェックポイントが、その時点から状態全体を回復できるように、完全なデータ状態を含んでいる必要があるためです。これにより、実質的に完全チェックポイントになります。
増分チェックポイントの利点は、通常、2 回目のチェックポイント以降に明らかになります。データ入力が安定しており、大規模な状態変更がない場合、後続の増分チェックポイントは小さくなることが期待されます。これは、システムが状態の増分部分のみのスナップショットを正しく作成していることを示します。サイズがまだ同じ場合は、システムの状態と動作を確認して、潜在的な問題を特定する必要があります。
Python ジョブのチェックポイントが遅い場合の対処法
原因
Python 演算子には内部キャッシュがあります。チェックポイント中に、キャッシュ内のすべてのデータを処理する必要があります。したがって、Python ユーザー定義関数 (UDF) のパフォーマンスが低い場合、チェックポイント時間が増加し、ジョブの実行に影響します。
ソリューション
キャッシュサイズを削減するには、[追加設定] セクションで以下のパラメーターを設定します。詳細については、「ジョブのカスタム実行時パラメーターを構成する方法は?」をご参照ください。
# デフォルト値は 100000 です。単位はエントリ数です。 python.fn-execution.bundle.size # デフォルト値は 1000 です。単位はミリ秒です。 python.fn-execution.bundle.timeパラメーターの詳細については、「Flink Python Configuration」をご参照ください。
ジョブにおける異常なチェックポイントのトラブルシューティング方法
例外タイプの診断
[モニタリングとアラート] タブまたは [状態管理] で、チェックポイント履歴を表示して、チェックポイントのタイムアウトや書き込み失敗などの例外の種類を特定できます。

分類、特定、および処理
シナリオ 1:頻繁なチェックポイントのタイムアウト。ジョブにバックプレッシャーがないか確認します。バックプレッシャーの根本原因を分析し、低速な演算子を特定し、リソースや構成の調整などの適切な操作を行うことができます。詳細については、「ジョブのバックプレッシャーのトラブルシューティング方法」をご参照ください。
シナリオ 2:チェックポイントの書き込み失敗。次の手順に従って TaskManager (TM) のログを見つけることができます。その後、ログを分析して原因を特定できます。
ジョブログページの [チェックポイント] タブで、[チェックポイント履歴] をクリックします。

異常なチェックポイントの左側にあるプラス記号 (+) をクリックして、演算子のチェックポイントステータスを表示します。
異常な演算子の左側にあるプラス記号 (+) をクリックします。次に、異常なサブタスクの ID をクリックして、対応する TM に移動します。

エラー: 新しいV4ステートエンジンを使用して、チェックポイントから古いステートデータを復元しています
エラー詳細
VVR 6.x から VVR 8.x にアップグレードすると、次のエラーが報告されます:
You are using the new V4 state engine to restore old state data from a checkpoint。原因
VVR 6.x と 8.x で使用される Gemini 状態バックエンドのバージョンが異なり、それらのチェックポイントには互換性がありません。
ソリューション
次のいずれかの方法で問題を解決できます。
標準フォーマットでジョブスナップショットを作成し、その状態からジョブを開始します。詳細については、「手動でジョブスナップショットを作成する」および「ジョブの開始」をご参照ください。
状態なしでジョブを再起動します。
(非推奨) レガシーバージョンの Gemini を引き続き使用します。
state.backend.gemini.engine.type: STREAMINGを構成し、変更を有効にするためにジョブを再起動する必要があります。パラメーターの構成方法の詳細については、「ジョブの実行時パラメーターを設定する方法」をご参照ください。(非推奨) VVR 6.x エンジンを引き続き使用してジョブを開始します。
エラー:java.lang.NegativeArraySizeException
エラー詳細
ジョブが List State を使用している場合、実行時に次の例外が発生します。
Caused by: java.lang.NegativeArraySizeException at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85) at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271) at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118) at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88) at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60) at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)原因
List State 内の単一キーの状態データが大きすぎて 2 GB を超えています。状態データが大きくなりすぎるプロセスは次のとおりです。
通常のジョブ操作中に、List State 内の単一キーに追加された値は、たとえばウィンドウ演算子を含む List State でのマージ操作によって結合されます。これにより、状態データが継続的に蓄積されます。
状態データが特定のポイントまで蓄積されると、最初に Out-of-Memory (OOM) エラーがトリガーされます。ジョブが障害から回復した後、List State のマージプロセスにより、状態バックエンドによって要求される一時的なバイト配列のサイズが使用可能な制限を超え、この例外が発生する可能性があります。
説明RocksDBStateBackend も同様の問題に遭遇し、ArrayIndexOutOfBoundsException またはセグメンテーション違反をトリガーする可能性があります。詳細については、「The EmbeddedRocksDBStateBackend」をご参照ください。
ソリューション
ウィンドウ演算子が状態データを大きくしすぎている場合は、ウィンドウサイズを小さくすることができます。
ジョブログが非効率な場合は、ロジックを調整できます。たとえば、キーを分割できます。
エラー:org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.
エラー詳細
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints原因
このエラーは、Kafka を sink として使用しているときに、複数の連続したチェックポイントが失敗したことが原因です。
ソリューション
execution.checkpointing.timeoutパラメーターを使用してチェックポイントのタイムアウト期間を調整し、タイムアウトによるチェックポイントの失敗を防ぐことができます。パラメーターの構成方法の詳細については、「ジョブのカスタム実行時パラメーターを設定する方法」をご参照ください。
エラー: チェックポイントの障害許容しきい値を超えました
エラー詳細
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)原因
構成された許容可能なチェックポイントの失敗回数が低すぎます。失敗したチェックポイントの数がこのしきい値を超えると、ジョブはフェールオーバーをトリガーします。このパラメーターが設定されていない場合、デフォルトではチェックポイントの失敗は許容されません。
ソリューション
execution.checkpointing.tolerable-failed-checkpoints: numパラメーターの `num` 値を設定して、ジョブが許容できるチェックポイントの失敗回数を調整できます。`num` 値は 0 または正の整数である必要があります。`num` が 0 の場合、チェックポイントの例外や失敗は許可されません。パラメーターの構成方法の詳細については、「ジョブのカスタム実行時パラメーターを設定する方法」をご参照ください。