Realtime Compute for Apache Flink において、ステート管理はパフォーマンス、安定性、リソース使用率に直接影響します。ステートが利用可能なメモリを超えて増加すると、ステートバックエンドはアクセス頻度の低いデータをディスクにスピルします。ディスク上のデータへのアクセスはメモリ内データへのアクセスよりも大幅に遅く、パイプライン全体にバックプレッシャーとして伝播するレイテンシの急上昇を引き起こします。これを放置すると、デプロイメントがクラッシュする可能性があります。
ステート関連のバックプレッシャーの診断
バックプレッシャーは、Apache Flink におけるパフォーマンスボトルネックの主要な指標です。ほとんどの場合、バックプレッシャーはステートサイズが割り当てられたメモリを超えて増え続けるために発生します。ステートバックエンドは使用頻度の低いステートデータをディスクに移動させ、オペレーターが頻繁にディスクからステートを読み取ると、データ遅延が大幅に増加し、パフォーマンスボトルネックが生じます。
バックプレッシャーが大規模ステートに起因するかどうかを判断するには、デプロイメントとそのオペレーターの実行ステータスを分析します。詳細については、「診断ツール」をご参照ください。
チューニング方法
適切な TTL 値の設定によるステートサイズの制御
デプロイメントの TTL を 0 から 0 より大きい値に変更する、またはその逆を行うと、互換性の問題が発生し、StateMigrationException エラーがスローされます。
Realtime Compute for Apache Flink の開発コンソールの [デプロイメント] ページで、状態の有効期限 パラメーターを設定します。詳細については、「パラメーター」をご参照ください。

TTL が短すぎると、計算結果が不正確になる可能性があります。例えば、データが遅れて到着し、集約や結合の際に関連するステートが期限切れになっている場合、結果は誤りとなります。TTL が長すぎると、リソース消費が増加し、安定性が低下します。データ特性とビジネス要件に基づいて TTL を設定してください。例えば、日次計算で日をまたぐ最大ドリフトが 1 時間の場合、TTL を 25 時間に設定します。
JOIN_STATE_TTL を使用したストリームごとの TTL (VVR 8.0.1 以降)
Ververica Runtime (VVR) 8.0.1 以降では、JOIN_STATE_TTL ヒントを使用して、通常結合の左ストリームと右ストリームに異なる TTL 値を設定できます。これにより、不要なステートストレージを削減し、パフォーマンスを向上させます。構文の詳細については、「クエリヒント」をご参照ください。
SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...次の表は、JOIN_STATE_TTL を実際のデプロイメントで使用した場合の影響を示しています。
| 項目 | デプロイメント詳細 | ステートサイズ |
|---|---|---|
| 変更前 | 左ストリームのデータ量は右ストリームの 20 倍から 50 倍です。パフォーマンスを維持するため、右ストリームの TTL は 10 日に設定されていますが (本来は 18 日であるべき)、データの正確性が損なわれています。ステートサイズは約 5.8 TB です。デプロイメントは最大 700 CU を消費します。 | ![]() |
| その後 | JOIN_STATE_TTL により、左ストリームの TTL は 12 時間、右ストリームの TTL は 18 日に設定され、データ整合性が確保されます。ステートサイズは約 590 GB に減少し、元の 10 分の 1 になります。デプロイメントは 200 から 300 CU を消費し、リソースを 50% から 70% 節約します。 | ![]() |
不要なステートフルオペレーターの削除
この方法は、オプティマイザーによって派生するオペレーターにのみ適用されます。SQL によって呼び出されるオペレーターは、通常、クエリロジック自体によって要求されます。
ChangelogNormalize
このオペレーターは、イベント時間に基づくテンポラル結合を除き、アップサートソーステーブルを含むほとんどのシナリオで呼び出されます。Upsert Kafka コネクタまたは同様のコネクタを使用する前に、イベント時間に基づくテンポラル結合が実行されていないことを確認してください。実行時に ChangelogNormalize オペレーターのステート関連のメトリックを監視します。ソーステーブルに多数のプライマリキーがある場合、オペレーターがキー付きステートを維持するため、ステートサイズが増加します。頻繁なプライマリキーの更新も、ステートへのアクセスと変更の頻度を増加させます。
データ同期のシナリオで Upsert Kafka コネクタを使用することは避けてください。代わりに、exactly-once 処理を保証するデータ同期ツールを使用してください。
SinkUpsertMaterializer
デフォルトでは、table.exec.sink.upsert-materialize パラメーターは auto に設定されています。これは、順序が乱れたチェンジログレコードなど、特定のシナリオで正確性を保証するために、システムが自動的に SinkUpsertMaterializer オペレーターを追加することを意味します。このオペレーターが存在することが、必ずしもレコードの順序が乱れていることを意味するわけではありません。例えば、複数のキーでデータをグループ化し、それらを単一の列にマージする場合、オプティマイザーはアップサートキーを正確に導出できないため、安全策としてこのオペレーターを追加します。
データ分布パターンが十分に理解されており、このオペレーターがなくても最終結果が正しい場合は、table.exec.sink.upsert-materialize を none に設定してパフォーマンスを向上させることができます。
Realtime Compute for Apache Flink の開発コンソールで SinkUpsertMaterializer がアクティブかどうかを確認します。存在する場合、トポロジー図では Sink オペレーターと連結して表示されます。


SinkUpsertMaterializer オペレーターが使用されておらず、計算結果が正しい場合は、'table.exec.sink.upsert-materialize'='none' を追加して不要な使用を防ぎます。設定手順については、「デプロイメント実行中のパラメーターを設定する方法」をご参照ください。
Ververica Runtime (VVR) 8.0 以降は、SQL 実行計画のインテリジェントな分析をサポートし、このような問題の特定に役立ちます。

miniBatch によるステートアクセス頻度の削減
分単位のレイテンシが許容できる場合は、miniBatch を有効にしてステートへのアクセスと更新の頻度を減らします。詳細については、「miniBatch を有効にしてスループットを向上させる」をご参照ください。
Realtime Compute for Apache Flink では、以下のオペレーターが miniBatch をサポートしています。
| オペレーター名 | 説明 |
|---|---|
| ChangelogNormalize | N/A |
| Deduplicate | table.exec.deduplicate.mini-batch.compact-changes-enable を設定して、イベント時間に基づく重複排除中にチェンジログを圧縮します。 |
| GroupAggregate / GlobalGroupAggregate / IncrementalGroupAggregate | N/A |
| RegularJoin | table.exec.stream.join.mini-batch-enabled を設定して、結合操作で miniBatch を有効にします。このパラメーターは、更新ストリームと外部結合のシナリオに適用されます。 |
実行計画の最適化
オプティマイザーは、SQL 文と構成に基づいてステートの実装を選択し、実行計画を生成します。以下の調整により、ステートサイズを大幅に削減できます。
通常結合でのプライマリキーの使用
結合キーにプライマリキーが含まれている場合、システムは
ValueState<RowData>オブジェクトを使用して、結合キーごとに最新の値のみを格納します。これにより、ストレージ効率が最大化されます。結合キーにプライマリキー以外のキーが含まれている場合、システムは
MapState<RowData, RowData>オブジェクトを使用して、各結合キーのプライマリキーごとにソーステーブルの最新レコードを格納します。プライマリキーが定義されていない場合、システムは
MapState<RowData, Integer>オブジェクトを使用して、各結合キーのデータレコード全体と出現回数を格納します。
DDL 文でプライマリキーを定義し、プライマリキーで通常結合を実行して、ストレージ効率を最適化してください。
追加専用ストリームでの重複排除に ROW_NUMBER を使用
重複排除には、FIRST_VALUE や LAST_VALUE の代わりに ROW_NUMBER 関数を使用してください。ROW_NUMBER を使用すると、Deduplicate オペレーターはキーごとに最初または最新のレコードのみを格納し、集約関数が必要とする完全なステートを維持する必要がありません。
多次元集約に FILTER 構文を使用
多次元集約には、CASE WHEN の代わりに FILTER 構文を使用してください。例えば、モバイルデバイス、デスクトップデバイス、およびすべてのデバイスでのユニークビジター数をカウントする場合です。SQL オプティマイザーは、同じキーに対する異なるフィルター引数を認識し、複数の COUNT DISTINCT 計算でステートデータを共有します。これにより、ステートへのアクセス回数が削減されます。テストによると、FILTER 構文は CASE WHEN 構文の 2 倍のパフォーマンスを提供できます。
複数ストリーム結合における結合順序の調整
Apache Flink は、データストリームを処理するためにバイナリハッシュ結合を使用します。複数のストリームを結合する場合、結合が追加されるたびにステートの冗長性が増幅されます。

小さいストリームを大きいストリームの前に結合することで、ステートの冗長性による増幅効果を緩和し、処理効率を向上させます。
ディスク読み取りの最小化
ディスクストレージへのアクセスを減らして、システムパフォーマンスを向上させ、メモリ使用率を最適化します。具体的な手法については、「ディスク読み取りの最小化」をご参照ください。
ステートフルオペレーターのリファレンス
Flink SQL は、Apache Flink のステートバックエンドとチェックポイントメカニズムに依存して、計算結果の結果整合性を保証します。オプティマイザーは、SQL 文と構成パラメーターに基づいてステートフルオペレーターを選択します。どのオペレーターがステートを維持し、どのようにクリーンアップするかを理解することは、大規模ステートのデプロイメントをチューニングするために不可欠です。
ステートフルオペレーターは 2 つのカテゴリに分類されます。
オプティマイザーによって派生するオペレーター -- SQL の構造と構成に基づいてオプティマイザーが自動的に追加します。
SQL によって呼び出されるオペレーター -- SQL 文の構文 (集約、結合、重複排除、ウィンドウ) によって直接トリガーされます。
オプティマイザーによって派生するオペレーター
オプティマイザーは、以下のステートフルオペレーターを導入する場合があります。これら 3 つすべてが、ステートのクリーンアップに TTL (Time-to-Live) を使用します。
| オペレーター名 | ステートクリーンアップメカニズム |
|---|---|
| ChangelogNormalize | Time-to-Live (TTL) |
| SinkUpsertMaterializer | |
| LookupJoin (*) |
ChangelogNormalize
ChangelogNormalize オペレーターは、プライマリキーを含むチェンジログを処理し、効率、データ整合性、およびデータの正確性を保証します。これは 2 つのシナリオで使用されます。
シナリオ 1: ソーステーブルにプライマリキーがあり、UPSERT 操作をサポートしている。
このタイプのテーブルは、アップサートソーステーブルとして知られています。プライマリキーのシーケンスを維持しながら、プライマリキーに対する UPDATE (INSERT および UPDATE_AFTER) と DELETE 操作のみを含むチェンジログストリームを生成します。例えば、Upsert Kafka コネクタはアップサートソーステーブルを作成します。カスタムソースコネクタも、getChangelogMode メソッドをオーバーライドすることで UPSERT 操作をサポートできます。
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.upsert();
}シナリオ 2: 'table.exec.source.cdc-events-duplicate' = 'true' 構成が指定されている。
Change Data Capture (CDC) の at-least-once 処理は、重複したチェンジログレコードを生成する可能性があります。exactly-once 処理を実現するには、この構成を指定して重複を削除します。次の図は、このシナリオを示しています。
この例では、入力データはソーステーブルの DDL 文で定義されたプライマリキーに基づいてハッシュシャッフルされます。ChangelogNormalize オペレーターは ValueState オブジェクトを作成し、各プライマリキーの最新レコードを格納します。次の図は、オペレーターがステートを更新し、出力を生成する方法を示しています。2 番目の -U(2, 'Jerry', 77) レコードが到着すると、ステートに格納されている対応する値は空になります。これは、プライマリキー 2 の ADD 変更 (+I および +UA) の数が RETRACT 変更 (-D および -UB) の数と等しいことを意味します。重複レコードは破棄されます。
SinkUpsertMaterializer
SinkUpsertMaterializer オペレーターは、結果テーブルにプライマリキーがある場合に、データのマテリアライズがアップサートセマンティクスに準拠することを保証します。ストリーム処理中に、データレコードが一意性や順序性を失ってから結果テーブルに書き込まれる前に、オプティマイザーが自動的にこのオペレーターを追加します。これは、結果テーブルのプライマリキーに基づいてステートを維持します。関連するシナリオについては、「Flink SQL で順序が乱れたチェンジログイベントを処理する」をご参照ください。
LookupJoin
ルックアップ結合で 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE' が構成されており、オプティマイザーが潜在的な非決定論的な更新を識別した場合 (「ストリーミングにおける非決定論的な更新の影響を排除する方法」をご参照ください)、システムは LookupJoin オペレーターを追加して問題の解決を試みます。
サンプルシナリオ:
結果テーブルのプライマリキーがディメンションテーブルのキーと部分的または完全に重複しており、ディメンションテーブルが CDC または他のツールを介して更新される。
結合がディメンションテーブルのプライマリキー以外のフィールドを含む。
どちらのシナリオでも、LookupJoin オペレーターはクエリの正確性と一貫性を維持しながら、動的なデータ変更を処理します。
SQL によって呼び出されるオペレーター
これらのオペレーターは、SQL 文の構文によって直接トリガーされます。TTL またはウォーターマークの進行状況に基づいてステートをクリーンアップします。
ウィンドウベースのオペレーター (WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopN) は、ウォーターマークの進行状況に基づいてステートをクリーンアップします。ウォーターマークのタイムスタンプがウィンドウの終了時間を超えると、組み込みのタイマーがステートのクリーンアップをトリガーします。
| オペレーター名 | 呼び出し方法 | ステートクリーンアップ |
|---|---|---|
| Deduplicate | ROW_NUMBER 関数を使用し、ORDER BY 句で時間属性フィールドを指定し、最初の行のみをクエリします。時間属性はイベント時間または処理時間です。 | TTL |
| RegularJoin | 等価条件に時間属性フィールドを含まない JOIN 句を使用します。 | TTL |
| GroupAggregate | GROUP BY 句と集約関数 (SUM、COUNT、MIN、MAX、FIRST_VALUE、LAST_VALUE) または DISTINCT キーワードを使用します。 | TTL |
| GlobalGroupAggregate | ローカル-グローバル集約を有効にします。 | TTL |
| IncrementalGroupAggregate | 2 段階のグループ集約クエリを使用し、ローカル-グローバル集約を有効にします。GlobalGroupAggregate と LocalGroupAggregate オペレーターは IncrementalGroupAggregate オペレーターにマージされます。 | TTL |
| Rank | ORDER BY 句に時間属性フィールドを含まない ROW_NUMBER 関数を使用します。 | TTL |
| GlobalRank | ORDER BY 句に時間属性フィールドを含まない ROW_NUMBER 関数を使用し、ローカル-グローバル集約を有効にします。 | TTL |
| IntervalJoin | 時間属性条件 (イベント時間または処理時間) を持つ JOIN 句を使用します。例:L.time between R.time + X and R.time + Y または R.time between L.time - Y and L.time - X | ウォーターマーク |
| TemporalJoin | イベント時間に基づいて内部結合または左結合を実行します。 | ウォーターマーク |
| WindowDeduplicate | 重複排除にウィンドウテーブル値関数 (TVF) を使用します。 | ウォーターマーク |
| WindowAggregate | 集約にウィンドウ TVF を使用します。 | ウォーターマーク |
| GlobalWindowAggregate | 集約にウィンドウ TVF を使用し、ローカル-グローバル集約を有効にします。 | ウォーターマーク |
| WindowJoin | 結合にウィンドウ TVF を使用します。 | ウォーターマーク |
| WindowRank | ソートにウィンドウ TVF を使用します。 | ウォーターマーク |
| GroupWindowAggregate | ウィンドウ集約のレガシー構文を使用します。 | ウォーターマーク |
リファレンス
大規模ステートのデプロイメントのパフォーマンスチューニング -- 大規模ステートに起因する問題とチューニングワークフローについて説明します。
DataStream API を使用してステートサイズを制御し、バックプレッシャーを削減する -- DataStream API を使用した柔軟なステート管理。
チェックポイントとセーブポイントのタイムアウトの診断と防止 -- チェックポイント/セーブポイントのタイムアウト問題を特定し、解決します。
起動とスケーリングの速度を向上させる -- デプロイメントの起動とスケーリング中のパフォーマンスボトルネックを解消します。

