本ページでは、構文、設定、関数の観点から Flink SQL コードのパフォーマンスを最適化する方法について説明します。

Group By 関数の最適化

  • microBatch または miniBatch の有効化によるスループットの向上

    microBatch ポリシーと miniBatch ポリシーはどちらもマイクロバッチ処理に使用されます。具体的には、レコードごとに個別のタスクを実行するのではなく、1 つのタスクで複数のレコードを処理します。 いずれかのポリシーを有効にすると、データキャッシュがトリガー条件を満たしたときに Realtime Compute がデータを処理します。 これにより、Realtime Compute が状態データにアクセスする頻度が減少し、スループットが向上し、データ出力が減少します。

    microBatch と miniBatch のポリシーは、トリガーの点で互いに異なります。 miniBatch ポリシーは、各タスクに登録されているタイマースレッドを使用してマイクロバッチ処理をトリガーします。 そのため、スレッドスケジューリングのオーバーヘッドを伴います。 miniBatch ポリシーのアップグレードバージョンである microBatch ポリシーは、特定の間隔でデータソースに挿入されるイベントメッセージを使用してマイクロバッチ処理をトリガーします。 microBatch ポリシーは miniBatch ポリシーよりも優れており、データのシリアル化効率を高め、バックプレッシャーを低減し、スループットを高め、遅延を短縮します。

    • シナリオ
      マイクロバッチ処理は、遅延の増大を犠牲にして、より高いスループットを実現します。 非常に低い遅延を必要とするシナリオには適用されません。 ただし、データ集計シナリオでは、ジョブのパフォーマンスを向上させるためにマイクロバッチ処理を有効にすることを推奨します。
      microBatch を有効にすることで、データが 2 つのフェーズで集計されるときのデータジッタの問題点も解決されます。
    • 有効化の方法
      デフォルトでは、microBatch と miniBatch は無効になっています。 有効化にするには、以下のパラメーターを設定します。
      # Enable window miniBatch in Realtime Compute V3.2 and later. (By default, window miniBatch is disabled in Realtime Compute V3.2 and later.)
      sql.exec.mini-batch.window.enabled=true
      # The interval for generating batch data. You must specify this parameter when you enable microBatch. We recommend that you set this parameter to the same value as that of blink.miniBatch.allowLatencyMs.
      blink.microBatch.allowLatencyMs=5000
      # When you enable microBatch, you must reserve the following two miniBatch settings:
      blink.miniBatch.allowLatencyMs=5000
      # The maximum number of data records that can be cached for each batch. You must set this parameter to avoid the out of memory (OOM) error.
      blink.miniBatch.size=20000
  • LocalGlobal の有効化による、データホットスポットの一般的な問題の解決

    LocalGlobal ポリシーは、集計プロセスをローカル集計とグローバル集計の 2 つのフェーズに分割します。 MapReduce の結合フェーズと削減フェーズに似ています。 ローカル集計フェーズでは、Realtime Compute がデータのマイクロバッチを各入力ノード (localAgg) でローカルに集計し、各バッチ (アキュムレータ) のアキュムレータ値を生成します。 グローバル集計フェーズでは、Realtime Compute はアキュムレータ値 (マージ) をマージして、最終結果 (globalAgg) を取得します。

    LocalGlobal ポリシーは、ローカル集計によるデータスキューを排除し、グローバル集計におけるデータホットスポットの問題を解決することができます。 その結果、パフォーマンスが向上します。 下図に、LocalGlobal ポリシーがデータスキューの問題を解決する方法を示します。
    • シナリオ

      LocalGlobal を有効にして、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数のパフォーマンスを向上させ、これらの関数を実行する際のデータホットスポットの問題を解決することができます。
      LocalGlobal を有効にするには、ユーザー定義の集計関数 (UDAF) を定義して、merge メソッドを実装する必要があります。
    • 有効化の方法

      Realtime Compute V2.0 以降では、デフォルトで LocalGlobal が有効になっています。 blink.localAgg.enabled パラメーターを true に設定すると、LocalGlobal が有効になります。 このパラメーターは、microBatch または miniBatch が有効な場合にのみ有効です。
    • 検証

      LocalGlobal が有効かどうかを確認するには、GlobalGroupAggregate または LocalGroupAggregate ノードが最終トポロジーに存在するかどうかを確認します。
  • CountDistinct 関数の実行時に PartialFinal を有効化することによるデータホットスポット問題の解決

    LocalGlobal ポリシーは、SUM、COUNT、MAX、MIN、AVG などの一般的な集計関数のパフォーマンスを効果的に改善することができます。 ただし、CountDistinct 関数のパフォーマンス向上には効果的ではありません。 ローカル集計では重複する個別のキーを効果的に削除できないことが原因です。 その結果、グローバル集計フェーズでは、依然として大量のデータが積み上げられます。

    V2.2.0 より前の Realtime Compute では、CountDistinct 関数を実行するときに個別のキーでデータを分散するレイヤーを追加して、集計プロセスを 2 つのフェーズに分割してデータホットスポットの問題を解決できるようにする必要があります。 Realtime Compute V2.2.0 以降には、データを自動的に分散して集計プロセスを分割するための PartialFinal ポリシーが用意されています。 下図に、LocalGlobal と PartialFinal の違いを示します。
    • シナリオ

      PartialFinal ポリシーは、CountDistinct 関数を実行したときに集計パフォーマンスが要件を満たせないシナリオに適用されます。
      • UDAF を含む Flink SQL コードで PartialFinal を有効にすることはできません。
      • データ量が多い場合にのみ、PartialFinal を有効にすることを推奨します。 これは、PartialFinal 機能が自動的にデータを 2 つの集計層に分散させ、追加のネットワークシャッフルを導入するためです。 データ量が多くない場合は、リソースの無駄になります。
    • 有効化の方法

      デフォルトでは、PartialFinal は無効になっています。 PartialFinal を有効にするには、blink.partialAgg.enabled パラメータを true に設定します。
    • 検証

      PartialFinal が有効になっているかどうかを確認するには、最終的なトポロジーに拡張可能なノードが存在するかどうか、または集計層の数が 1 から 2 に変更されているかどうかを確認します。
  • CountDistinct 関数の実行時に agg フィルター構文をを使用したジョブパフォーマンスの改善
    このメソッドは、Realtime Compute V2.2.2 以降でサポートされています。
    統計ジョブは、すべてのチャネルの UV、モバイルクライアントの UV、PC クライアントの UV など、さまざまな次元のユニークビジター (UV) を記録します。 多次元統計分析を実装するには、大文字と小文字を区別する agg 構文の の代わりに、フィルター付きの agg 構文の使用を推奨します。 その理由は、Realtime Compute の SQL オプティマイザーがフィルターパラメーターを分析するためです。 Realtime Compute は、状態データを共有することにより、異なるフィルター条件で同じフィールドに対して CountDistinct 関数を実行することができます。 これにより、状態データに対する読み取りおよび書き込み操作が削減されます。 パフォーマンステストの結果は、大文字小文字を区別する agg 構文と比較して、フィルター付きの agg 構文は、ジョブのパフォーマンスが 2 倍に向上することを示しています。
    • シナリオ

      大文字小文字を区別する agg 構文をフィルター付きの agg 構文で置き換えることを推奨します。 これにより、異なるフィルター条件を使用して同じフィールドで CountDistinct 関数を実行すると、ジョブのパフォーマンスが特に向上します。
    • 元のステートメント
      COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
    • 最適化されたステートメント
      COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

TopN アルゴリズムの最適化

  • TopN アルゴリズム
    TopN の入力ストリームが静的ストリーム (ソースなど) である場合、TopN は AppendRank という 1 つのアルゴリズムのみをサポートします。 TopN の入力ストリームが動的ストリーム (集計関数または結合関数によって処理されるストリームなど) の場合、TopN は 3 つのアルゴリズム、UpdateFastRank、UnaryUpdateRank、および RetractRank (降順) をサポートします。 採用されたアルゴリズムの名前は、トポロジーのノード名に含まれています。
    • UpdateFastRank は最適なアルゴリズムです。 このアルゴリズムを使用するには、次の 2 つの条件を満たす必要があります。1. 入力ストリームには、主キー情報が含まれている必要があります。 2. ORDER BY 句のフィールドまたは関数の値は、ソートの逆順で単調に更新されます。 たとえば、ORDER BY 句を次のように定義します。ORDER BY COUNT DESC、ORDER BY COUNT DISTINCT DESC、または ORDER BY SUM (positive) DESC。
      ORDER BY SUM (positive) DESCの場合、正の値をフィルタリングするための条件を指定する必要があります。 このフィルター条件は、SUM 関数の値が正であることをオプティマイザーに通知します。 UpdateFastRank アルゴリズムを使用します。 このアルゴリズムは、Realtime Compute V2.2.2 以降でサポートされています。 サンプルコードは以下のとおりです。
      SELECT cate_id, seller_id, stat_date, pay_ord_amt # The rownum field is not included. This reduces the amount of output data to be written to the result table.
      FROM (SELECT*
            ROW_NUMBER ()OVER(PARTITIONBY cate_id, stat_date # Ensure that the stat_date field is included. Otherwise, the data may be disordered when the state data expires.
      ORDER BY pay_ord_amt DESC)AS rownum## Sort data by the sum of the input data.
        FROM (SELECT cate_id, seller_id, stat_date, # Note: The values involved in the SUM function are positive, so the result of the SUM function is monotonically increasing. That's why TopN can apply optimization algorithms.
      sum(total_fee) filter (where total_fee >=0) as pay_ord_amt
          FROM WHERE total_fee >=0 GROUP BY cate_name, seller_id, stat_date)WHERE rownum <=100))
    • UnaryUpdateRank は、 UpdateFastRank に次ぐパフォーマンスを発揮します。 このアルゴリズムを使用するには、入力ストリームに主キー情報が含まれていることをご確認ください。 たとえば、"ORDER BY AVG" のように ORDER BY 句を定義します。
    • RetractRank は、パフォーマンスの面では最も低くなります。 このアルゴリズムを本番環境で使用することは推奨しません。 入力ストリームをご確認ください。 入力ストリームに主キー情報が含まれている場合は、UnaryUpdateRank または UpdateFastRank を使用してジョブのパフォーマンスを最適化します。
  • 最適化方法
    • rownum フィールドの除外

      TopN の出力に rownum を含めないでください。 最終的にフロントエンドに表示されるときに結果をソートすることを推奨します。 結果テーブルに書き込まれるデータの量を大幅に削減できます。 詳細については、「TopN 文」をご参照ください。
    • TopN のキャッシュサイズの増加

      TopN は、状態データへのアクセスの効率を向上させる状態キャッシュを提供します。 これにより、パフォーマンスが向上します。 以下の式は、TopN キャッシュのヒット率を計算するために使用されます。
      cache_hit = cache_size×parallelism / top_n / partition_key_num
      Top100 を例にとります。 キャッシュに 10,000 レコードが含まれ、並列処理が 50 であると想定します。 PARTITION BY 関数のキーの数が 100,000 の場合、キャッシュヒット率は 5% (10000 × 50/100/100000 = 5%) になります。 ヒット率は低く、大量のリクエストがディスク状態データにアクセスしていることを示しています。 これにより、パフォーマンスが大幅に低下します。 PARTITION BY 関数のキーの数が多い場合は、TopN のキャッシュサイズとヒープメモリを増やすことができます。 詳細については、「マニュアル設定によるパフォーマンスの最適化」をご参照ください。
      ## In this example, if you increase the cache size of TopN from the default value 10,000 to 200,000, the cache hit rate may reach 100% (200000 × 50/100/100000 = 100%).
      blink.topn.cache.size=200000
    • PARTITION BY 関数に時間フィールドを含める

      たとえば、日次ランキングの場合、ステートメントに日付フィールドを含める必要があります。 そうしないと、状態データの有効期限が切れた際に TopN の結果が乱れる可能性があります。

重複排除のパフォーマンスを最適化

Realtime Compute V3.2.1 以降は、効率的な重複排除をサポートしています。
Realtime Compute の入力ストリームには重複データが含まれる場合があり、効率的な重複排除が頻繁に要求されます。 Realtime Compute には、効率的に重複データを削除するためのポリシーが 2 種類 (Deduplicate Keep FirstRow および Deduplicate Keep LastRow) 用意されています。
  • 構文
    Flink SQL は重複排除の構文を提供していません。 指定された主キーの下で最初または最後の重複レコードを予約し、必要に応じて残りの重複レコードを破棄するには、SQL の ROW_NUMBER() ウィンドウ 関数を使用する必要があります。 この意味で、重複排除は特別な TopN 機能です。
    SELECT *
    FROM (
       SELECT *,
        ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
         ORDER BY timeAttributeCol [asc|desc]) AS rownum
       FROM table_name)
    WHERE rownum = 1
    要素名 説明
    ROW_NUMBER() 行番号を計算します。 OVER 句を含むウィンドウ関数です。 値は 1 から始まります。
    PARTITION BY col1[, col2..] オプションです。 重複レコードの主キーを格納するためのパーティション列を指定します。
    ORDER BY timeAttributeCol [asc|desc]) 時間属性(proctime または rowtime) に基づいてレコードをソートするための列を指定します。 レコードは、時間属性の昇順 (Deduplicate Keep FirstRow) または降順 (Deduplicate Keep LastRow) でソートすることができます。
    rownum 行数を計算します。 この要素は次のように設定します。rownum = 1 または rownum <= 1
    前述の構文によると、重複排除には 2 つのステップが含まれます。
    1. ROW_NUMBER() ウィンドウ関数を使用して、指定した時間属性でデータをソートし、データにランクを付けます。
      • 時間属性が proctime の場合、Realtime Compute は、レコードが Realtime Compute によって処理された時間に基づいて、重複するレコードを削除します。 この場合、ランクは毎回異なる場合があります。
      • 時間属性が rowtime の場合、Realtime Compute は、レコードが Realtime Compute に書き込まれた時間に基づいて重複レコードを削除します。 この場合、ランクは毎回同じままです。
    2. 指定された主キーの下の最初のレコードを予約し、残りの重複レコードを削除します。
      時間属性の昇順または降順でレコードをソートすることができます。
      • Deduplicate Keep FirstRow:Realtime Compute は、時間属性の昇順でレコードをソートし、指定された主キーの下に最初のレコードを予約します。
      • Deduplicate Keep LastRow:Realtime Compute は、時間属性の降順でレコードをソートし、指定された主キーの下の最初のレコードを予約します。
  • Deduplicate Keep FirstRow
    Deduplicate Keep FirstRow ポリシーを選択した場合、Realtime Compute は指定された主キーの下の最初のレコードを予約し、残りの重複レコードを破棄します。 この場合、状態データには主キー情報のみが格納され、状態データへのアクセス効率が大幅に向上します。 サンプルコードは次のとおりです。
    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    上記のコードは、b フィールドに基づいてテーブル T の重複レコードを削除し、システム時刻に基づいて、指定された主キーの下の最初のレコードを予約します。 この例では、proctime 属性は、Realtime Computeでレコードが処理されるシステム時間を示しています。 Realtime Compute は、この属性に基づいてテーブル T のレコードをソートします。 システム時刻に基づいて重複レコードを削除するには、proctime 属性を宣言する代わりに、PROCTIME() 関数を呼び出すこともできます。
  • Deduplicate Keep LastRow
    Deduplicate Keep LastRow ポリシーを選択すると、Realtime Compute は指定された主キーの下の最後のレコードを予約し、残りの重複レコードを破棄します。 このポリシーは、パフォーマンスの点で LAST_VALUE 関数よりわずかに優れています。 Deduplicate Keep LastRow のサンプルコードは次のとおりです。
    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    上記のコードは、b および d フィールドに基づいてテーブル T の重複レコードを削除し、レコードが Realtime Compute に書き込まれた時間に基づいて、指定された主キーの下の最後のレコードを予約します。 この例では、rowtime 属性は、レコードが Realtime Compute に書き込まれたときのイベント時間を示しています。 Realtime Compute は、この属性に基づいてテーブル T のレコードをソートします。

効率的な組み込み関数の使用

  • 組み込み関数を使用したユーザー定義拡張機能 (UDX) を置き換え

    Realtime Compute の組み込み関数は継続的に最適化されています。 組み込み関数を使用して、可能な限り UDX を置き換えることを推奨します。 Realtime Compute V2.0 は、以下の側面で組み込み関数を最適化します。
    • シリアル化と逆シリアル化の効率を向上させます。
    • バイトレベルでの操作が行えます。
  • KEYVALUE 関数で 1 文字の区切り文字を使用する

    KEYVALUE 関数のシグネチャは、KEYVALUE(content, keyValueSplit, keySplit, keyName) です。 keyValueSplit および KeySplit がコロン (:) またはコンマ (,) などの単一文字の区切り文字である場合は、Realtime Compute は最適化されたアルゴリズムを使用します。 Realtime Compute は、コンテンツ全体をセグメント化する代わりに、バイナリデータの中から必要な KeyName 値を直接検索します。 これにより、ジョブのパフォーマンスが約 30% 向上します。
  • 複数のキーと値のペアが存在する場合は、MULTI_KEYVALUE 関数を使用する
    この方法は、Realtime Compute V2.2.2 以降でサポートされています。
    クエリには、同じコンテンツに対する複数の KEYVALUE 関数が含まれる場合があります。 コンテンツに 10 個のキーと値のペアが含まれていると想定します。 これらの 10 個の値をすべて抽出してフィールドとして使用するには、10 個の KEYVALUE 関数を記述してコンテンツを 10 回解析する必要があります。 この場合、テーブル値関数 MULTI_KEYVALUE を使用することを推奨します。この関数では、コンテンツに対して 1 つの SPLIT 解析のみが必要となります。 これにより、ジョブのパフォーマンスが 50% から 100% 向上します。
  • LIKE 演算子は注意して使用する
    • 指定したコンテンツで始まるレコードを照合するには、LIKE 'xxx%' を使用します。
    • 指定したコンテンツで終わるレコードを照合するには、LIKE '%xxx' を使用します。
    • 指定されたコンテンツを含むレコードを照合するには、LIKE '%xxx%' を使用します。
    • 指定されたコンテンツと同じレコードを照合するには、LIKE 'xxx' を使用します。これはstr = 'xxx' と同じです。
    • アンダースコア (_) を照合するには、LIKE '%seller/id%' ESCAPE '/ を使用します。 アンダースコア (_) は、SQL では単一文字のワイルドカードであり、任意の文字と一致できるため、エスケープされています。 LIKE '%seller_id%' を使用すると、seller_idseller#idsellerxidseller1id など、多くの結果が返されます。 結果は不十分な場合があります。
  • 正規表現の使用を避ける

    正規表現の実行には時間がかかる場合があり、足算、引算、乗算、除算などの他の演算と比較して、100 倍の計算リソースが必要になる場合があります。 特定の状況で正規表現を実行すると、ジョブが無限ループに陥る可能性があります。 したがって、可能な限り LIKE 演算子を使用します。 一般的な正規表現の詳細については、対応するリンクをクリックしてください。

ネットワーク送信の最適化

一般的なパーティショナーポリシーは次のとおりです。
  • キーグループ/ハッシュ:指定されたキーに基づいてデータを配布します。
  • リバランス:ラウンドロビンスケジューリングを通じて各チャネルにデータを分散します。
  • 動的リバランス:出力チャネルの負荷ステータスに基づいて、負荷の少ないチャネルにデータを動的に分散します。
  • フォワード:プロセスがチェーンされていない場合のリバランスに似ています。 キーとチャネルがチェーンされると、Realtime Compute は指定されたキーの下のデータを対応するチャネルに配布します。
  • リスケール:入力チャネルと出力チャネル間で、一対多または多対一のモードでデータを分散します。
  • 動的リバランスを使用してリバランスを置き換える

    動的リバランスを使用すると、Realtime Compute は、各サブパーティションのバッファー済みデータの量に基づいて、負荷の低いサブパーティションにデータを書き込み、動的なロードバランシングを実現することができます。 静的リバランスポリシーと比較して、動的リバランスは負荷を分散し、出力計算ノードの計算容量が不均衡な場合に全体的なジョブパフォーマンスを向上させることができます。 リバランスを使用したときに出力ノードの負荷が不均衡であることがわかった場合は、動的リバランスの使用をご検討ください。 動的リバランスを使用するには、task.dynamic.rebalance.enabled パラメーターを true に設定します。 デフォルト値は false です。
  • リスケールを使用してリバランスを置き換える

    リスケール は Realtime Compute V2.2.2 以降でサポートされています。
    5 つの並列入力ノードと 10 の並列出力ノードがあると想定します。 リバランスを使用する場合、各入力ノードはラウンドロビンスケジューリングを通じて 10 個すべての出力ノードにデータを分散します。 リスケールを使用する場合、各入力ノードは、ラウンドロビンスケジューリングを通じて 2 つの出力ノードにデータを分散するだけで済みます。 これにより、チャネル数が減少し、各サブパーティションのバッファリング速度が向上するため、ネットワーク効率が向上します。 入力データが偶数で、並列入力ノードと並列出力ノードの数が同じ場合、リスケールを使用してリバランスを置き換えることができます。 リスケールを使用するには、enable.rescale.shuffling パラメーターを true に設定します。 デフォルト値は false です。

推奨設定

要約すると、次のジョブ設定を使用することを推奨します。
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval, in milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Realtime Compute V2.X uses Niagara as the state data backend, and uses it to set the lifecycle (in milliseconds) of the state data.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# Realtime Compute V2.X enables micro-batch processing with an interval of 5 seconds. (You cannot set this parameter when you use a window function.)
blink.microBatch.allowLatencyMs=5000
# The allowed latency for a job.
blink.miniBatch.allowLatencyMs=5000
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. This feature is enabled by default in Realtime Compute V2.X, but you must manually enable it if you use Realtime Compute V1.6.4.
blink.localAgg.enabled=true
# Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function in Realtime Compute V2.X.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Enable OBJECT REUSE for optimization.
#blink.object.reuse=true
# Configure garbage collection for optimization. (You cannot set this parameter when you use a Log Service source table.)
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Set the time zone.
blink.job.timeZone=Asia/Shanghai