すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:SQLデプロイメントにおけるバックプレッシャーを軽減するための状態サイズの制御

最終更新日:Jan 16, 2025

状態管理は、パフォーマンス、安定性、およびリソース使用率に影響します。不適切な状態管理は、システムクラッシュにつながる可能性があります。このトピックでは、SQLデプロイメントにおけるバックプレッシャーを軽減するために状態サイズを制御する方法について説明します。

ステートフルオペレーターの種類

SQLは、基礎となるデータ処理ロジックを処理することなくデータ操作を実行できる、ドメイン固有の宣言型言語です。 Flink SQLは、Apache Flinkの状態バックエンドとチェックポイントメカニズムを活用して、計算結果の最終的な整合性を確保します。実装レベルでは、Flink SQLはオプティマイザーを利用して、パラメーター構成とSQLステートメントに基づいてステートフルオペレーターを選択します。大量のデータに対するステートフル計算のパフォーマンスを最適化するには、基礎となるメカニズムの基本的な理解が必要です。

オプティマイザーによって導出されるステートフルオペレーター

次の表に、オプティマイザーによって導出されるステートフルオペレーターを示します。

オペレーター名

状態クリーンアップメカニズム

ChangelogNormalize

有効期限(TTL)

SinkUpsertMaterlizer

LookupJoin (*)

ChangelogNormalize

ChangelogNormalizeオペレーターは、主キーを含む変更ログを処理し、効率性、データの整合性、およびデータの正確性を確保します。このオペレーターは、以下のシナリオで使用されます。

  • ソーステーブルに主キーがあり、UPSERT操作をサポートしています。

    この場合、ソーステーブルはupsertソーステーブルとも呼ばれます。このテーブルは、主キーのシーケンスを維持しながら、主キーに対するUPDATE(INSERTおよびUPDATE_AFTERを含む)操作とDELETE操作のみを含む変更ログストリームを生成します。たとえば、Upsert Kafkaコネクターを使用してupsertソーステーブルを作成できます。また、getChangelogModeメソッドをオーバーライドして、UPSERT操作をサポートするカスタムソースコネクターを作成することもできます。

    // 変更ログモードをオーバーライドしてupsertを返す
    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.upsert();
    }
  • 'table.exec.source.cdc-events-duplicate' = 'true' 構成が指定されています。

    Change Data Capture(CDC)の少なくとも1回処理では、重複する変更ログレコードが生成される場合があります。厳密に1回処理が必要な場合は、この構成を指定して重複する変更ログレコードを削除します。サンプルシナリオ:

    image

    この例では、ソーステーブルのDDLステートメントで定義された主キーに基づいて、入力データに対してハッシュシャッフル操作が実行されます。次に、ChangelogNormalizeオペレーターは、各主キーの最新のレコードを格納するためにValueStateオブジェクトを作成します。次の図は、オペレーターが状態を更新し、出力を生成する方法を示しています。図に示すように、2番目の-U(2, 'Jerry', 77) レコードが到着すると、状態に格納されている対応する値は空になります。これは、主キー2のADD変更(+Iおよび+UA)の現在の数がRETRACT変更(-Dおよび-UB)の数と等しいことを示しています。したがって、この重複レコードは破棄されます。

    image

SinkUpsertMaterializer

SinkUpsertMaterializerオペレーターは、シンクテーブルに主キーがある場合、データのマテリアライゼーションがupsertセマンティクスに準拠することを保証します。ストリームデータ処理中に、データレコードの一意性とシーケンスがシンクテーブルに書き込まれる前に中断された場合、オプティマイザーはこのオペレーターを自動的に追加します。オペレーターは、シンクテーブルの主キーに基づいて状態を維持し、対応する制約が満たされるようにします。その他の一般的なシナリオについては、「Flink SQL で順序が正しくない変更ログイベントを処理する」をご参照ください。

LookupJoin

ルックアップ結合に 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE' 構成を指定し、オプティマイザーが潜在的な非決定性更新を識別した場合(ストリーミングにおける非決定性更新の影響を排除する方法 を参照)、システムは LookupJoin オペレーターを追加することで問題の解決を試みます。

サンプルシナリオ: (1) シンクテーブルの主キーがディメンションテーブルの主キーと部分的または完全に重複しており、ディメンションテーブルのデータが CDC または他のツールを使用して更新される可能性がある。 (2) 結合操作にディメンションテーブルの非主キーフィールドが含まれている。上記のシナリオでは、LookupJoin オペレーターは、クエリ結果の正確性と整合性を確保しながら、動的なデータ変更を効率的に処理できます。

SQLステートメントによって呼び出されるステートフルオペレーター

このタイプのステートフルオペレーターは、TTL値またはウォーターマークの進行状況に基づいて状態データをクリーンアップします。たとえば、WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopNなどのウィンドウ計算に使用されるステートフルオペレーターは、ウォーターマークの進行状況に基づいて状態データをクリーンアップします。ウォーターマークに含まれるタイムスタンプがウィンドウの終了時刻より後の場合、組み込まれたタイマーが状態のクリーンアップをトリガーします。

オペレーター名

呼び出し方法

状態クリーンアップメカニズム

Deduplicate

ROW_NUMBER関数を使用し、ORDER BY句で時間属性フィールドを指定し、最初の行のみをクエリします。時間属性は、イベント時間または処理時間です。

TTL

RegularJoin

等価条件に時間属性フィールドが含まれていないJOIN句を使用します。

GroupAggregate

GROUP BY句を使用し、グループ化された結果に集計関数(SUM、COUNT、MIN、MAX、FIRST_VALUE、LAST_VALUEなど)を適用するか、DISTINCTキーワードを使用します。

GlobalGroupAggregate

ローカルグローバル集計を有効にします。

IncrementalGroupAggregate

2レベルのグループ集計クエリを使用し、ローカルグローバル集計を有効にします。この場合、GlobalGroupAggregateオペレーターとLocalGroupAggregateオペレーターはIncrementalGroupAggregateオペレーターにマージされます。

Rank

ROW_NUMBER関数を使用し、ORDER BY句で時間属性フィールドを指定しません。

GlobalRank

ROW_NUMBER関数を使用し、ORDER BY句で時間属性フィールドを指定せず、ローカルグローバル集計を有効にします。

IntervalJoin

条件に時間属性フィールドが含まれるJOIN句を使用します。時間属性は、イベント時間または処理時間です。例:

L.time between R.time + X and R.time + Y 
  -- または
R.time between L.time - Y and L.time - X

ウォーターマーク

TemporalJoin

イベント時間に基づいて内部結合または左結合を実行します。

WindowDeduplicate

データ重複除去にウィンドウテーブル値関数(TVF)を使用します。

WindowAggregate

データ集計にウィンドウTVFを使用します。

GlobalWindowAggregate

データ集計にウィンドウTVFを使用し、ローカルグローバル集計を有効にします。

WindowJoin

結合にウィンドウTVFを使用します。

WindowRank

データソートにウィンドウTVFを使用します。

GroupWindowAggregate

ウィンドウ集計の従来の構文を使用します。

診断ツール

バックプレッシャーは、Apache Flinkのパフォーマンスボトルネックの指標です。ほとんどの場合、バックプレッシャーは、状態サイズが継続的に増加し、割り当てられたメモリサイズを超えたために発生します。この場合、状態バックエンドは、使用頻度の低い状態データをディスクストレージに移動します。ただし、ディスクストレージ内のデータへのアクセスは、メモリ内のデータへのアクセスよりも大幅に遅くなります。オペレーターがディスクから状態データを頻繁に読み取る場合、データレイテンシが大幅に増加します。これにより、パフォーマンスのボトルネックが発生します。

バックプレッシャーが大きな状態サイズによって引き起こされているかどうかを特定するには、デプロイメントとオペレーターの実行状態を徹底的に分析する必要があります。Realtime Compute for Apache Flinkの診断ツールを使用してパフォーマンスの問題を特定する方法については、「診断ツール」をご参照ください。

チューニング方法

ステートフルオペレーターの不要な使用を避ける

このチューニング方法は、オプティマイザーによって導出されるステートフルオペレーターにのみ適用されます。ほとんどの場合、SQLステートメントによって呼び出されるステートフルオペレーターは必要です。

  • ChangelogNormalize

    このオペレーターは、イベント時間に基づくテンポラル結合を除き、upsertソーステーブルを含むほとんどのシナリオで呼び出されます。Upsert Kafkaコネクターまたは同様のコネクターを使用する前に、イベント時間に基づくテンポラル結合が実行されていないことを確認してください。デプロイメントの実行中は、ChangelogNormalizeオペレーターの状態関連メトリックを監視する必要があります。ソーステーブルに多数の主キーがある場合、オペレーターは主キー(キー付き状態とも呼ばれます)に基づいて状態を維持するため、状態サイズが増加します。主キーが頻繁に更新される場合、状態データは頻繁にアクセスおよび変更されます。データ同期などのシナリオでは、Upsert Kafkaコネクターを使用してソーステーブルを作成しないことをお勧めします。また、厳密に1回処理を保証するデータ同期ツールを使用することをお勧めします。

  • SinkUpsertMaterializer

    デフォルトでは、table.exec.sink.upsert-materialize パラメーターは auto に設定されています。これは、変更ログレコードの順序が正しくない場合など、特定のシナリオでデータの正確性を確保するために、システムが SinkUpsertMaterializer オペレーターを自動的に使用することを示します。このオペレーターの使用は、必ずしもレコードの順序が正しくないことを意味するわけではないことに注意してください。たとえば、複数のキーを使用してデータをグループ化し、キーを単一の列にマージする場合、オプティマイザーは upsert キーを正確に導出できません。したがって、データの正確性を確保するために、このオペレーターが追加されます。データ分散パターンに精通しており、このオペレーターを使用せずに最終結果が正しい場合は、table.exec.sink.upsert-materialize パラメーターを none に設定してパフォーマンスを最適化します。

    Realtime Compute for Apache Flink の開発コンソールで、デプロイメントで SinkUpsertMaterializer オペレーターが使用されているかどうかを確認できます。オペレーターが使用されている場合、次の図に示すように、トポロジ図に Sink オペレーターとともに表示されます。2 つのオペレーターはオペレーターチェーンを形成します。これにより、SinkUpsertMaterializer オペレーターの適用を直感的に監視および評価し、情報に基づいた意思決定を行うことができます。

    image.png

    image.png

    SinkUpsertMaterializer オペレーターが使用されておらず、計算結果が正しい場合は、'table.exec.sink.upsert-materialize'='none' 構成を追加して、このオペレーターの不要な使用を防ぐことをお勧めします。構成の追加方法については、「デプロイメント実行のパラメーターを構成するにはどうすればいいですか?」をご参照ください。同様の問題を特定するのに役立つように、次の図に示すように、Ververica Runtime (VVR) 8.0 以降では SQL 実行プランのインテリジェント分析がサポートされています。

    image.png

状態アクセス頻度の削減: miniBatch を有効にする

ビジネスで分単位のレイテンシが許容できる場合は、miniBatch を有効にして状態アクセスと更新の頻度を減らすことができます。詳細については、「miniBatch を有効にしてスループットを向上させる」をご参照ください。

次の表に、Realtime Compute for Apache Flink で miniBatch をサポートするステートフルオペレーターを示します。

オペレーター名

説明

ChangelogNormalize

該当なし

Deduplicate

table.exec.deduplicate.mini-batch.compact-changes-enable パラメーターを構成して、イベント時間に基づく重複除去中に変更ログを圧縮するかどうかを指定できます。

GroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

該当なし

RegularJoin

table.exec.stream.join.mini-batch-enabled パラメーターを構成して、結合操作の miniBatch を有効にする必要があります。このパラメーターは、更新ストリームと外部結合のシナリオに適用されます。

状態サイズの削減: 適切な TTL 値を指定する

説明

デプロイメントの TTL を 0 から 0 より大きい値に変更するか、またはその逆に変更すると、互換性の問題が発生し、StateMigrationException エラーがスローされます。

状態サイズはパフォーマンスにとって重要です。デプロイメントの状態サイズを制御するには、Realtime Compute for Apache Flink の開発コンソールの [デプロイメント] ページで、ビジネス要件に基づいて [状態の有効期限] パラメーターを構成します。詳細については、「パラメーター」をご参照ください。

image.png

TTL が短すぎると、計算結果が不正確になる可能性があります。たとえば、データの到着が遅れ、集計または結合操作中に関連する状態データの有効期限が切れた場合、結果は不正確になります。TTL が長すぎると、リソース消費量が増加し、安定性が低下します。データの特性とビジネス要件に基づいて TTL を構成することをお勧めします。たとえば、日次データ計算の日ごとの最大ドリフトが 1 時間の場合、TTL を 25 時間に設定できます。

VVR 8.0.1 以降では、JOIN_STATE_TTL ヒントを使用して、通常結合の左右のストリームの状態に異なる TTL 値を指定できます。これにより、不要な状態ストレージが削減され、パフォーマンスが向上します。ヒントの使用方法については、「クエリヒント」をご参照ください。

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

次の表に、JOIN_STATE_TTL ヒントを使用する前後のデプロイメントの状態サイズを示します。

項目

デプロイメントの詳細

状態サイズ

  • この通常結合では、左ストリームのデータ量は右ストリームのデータ量の 20 ~ 50 倍です。右ストリームのデータは 18 日間保持する必要がありますが、パフォーマンスを確保するために右ストリームの TTL は 10 日間に設定されています。その結果、データの正確性が損なわれます。

  • 結合操作の状態サイズは約 5.8 TB です。

  • デプロイメントは最大 700 コンピュートユニット (CU) を消費します。

22

  • JOIN_STATE_TTL ヒントを使用して、左ストリームの TTL を 12 時間に、右ストリームの TTL を 18 日間に設定します。これにより、データの整合性が確保されます。

  • 結合操作の状態サイズは約 590 GB に削減され、元のサイズの 10 分の 1 になります。

  • デプロイメントは 200 ~ 300 CU を消費し、これはリソースの 50% ~ 70% が節約されていることを示します。

23e

状態サイズの削減: 実行プランの最適化

オプティマイザーは、SQL ステートメントと指定された構成に基づいて状態実装を選択し、実行プランを生成します。

  • 主キーを使用して通常結合を最適化する

    • 結合キーに主キーが含まれている場合、システムは ValueState<RowData> オブジェクトを使用して、各結合キーの最新の値のみを格納します。これにより、ストレージ容量の使用率が最大化されます。

    • 結合キーに非主キーが含まれている場合、システムは MapState<RowData, RowData> オブジェクトを使用して、各結合キーの主キーに基づいてソーステーブルの最新のレコードを格納します。

    • 主キーが定義されていない場合、システムは MapState<RowData, Integer> オブジェクトを使用して、各結合キーに対応するデータレコード全体と、データレコードが表示される回数を格納します。

    ストレージ効率を最適化するには、結合するテーブルの DDL ステートメントで主キーを定義し、主キーに基づいて通常結合を実行することをお勧めします。

  • 追加専用ストリームの重複除去を最適化する

    ROW_NUMBER 関数を使用して FIRST_VALUE または LAST_VALUE 関数を置き換えることで、重複除去効率を高めます。ROW_NUMBER 関数を使用して最初または最新のデータレコードを取得する場合、Deduplicate オペレーターは指定されたキーの最初または最新のレコードのみを格納します。

  • 集計パフォーマンスを向上させる

    モバイルデバイス、デスクトップデバイス、およびすべてのデバイスでユニークビジター数を計算する場合など、多次元データ集計で CASE WHEN 構文の代わりに FILTER 構文を使用します。SQL オプティマイザーは、同じキーの異なるフィルター引数を認識できます。これにより、同じキーの異なる条件に基づいて複数の COUNT DISTINCT 値が計算されるときに状態データを共有できます。また、状態データにアクセスする回数も削減されます。テストでは、FILTER 構文は CASE WHEN 構文の 2 倍のパフォーマンスを提供できることが示されています。

状態サイズの削減: 複数のストリームの結合順序を調整する

Apache Flink は、バイナリハッシュ結合を使用してデータストリームを処理します。次の例では、ストリーム A と B の結合により、不要なストレージ消費が発生します。結合されるストリームの数が増えるにつれて、問題はさらに深刻になります。

image.png

この問題に対処するには、結合順序を調整します。たとえば、データ量の多いストリームの前に、データ量の少ないストリームを結合できます。これにより、状態の冗長性によって引き起こされる増幅効果が軽減され、データ処理の効率とパフォーマンスが向上します。

ディスク読み取りを最小限に抑える

ディスクストレージにアクセスする回数を減らして、システムパフォーマンスを向上させることができます。また、メモリ使用率を最適化することもできます。詳細については、「ディスク読み取りを最小限に抑える」をご参照ください。

参照情報