ジョブパラメーター、リソースパラメーター、アップストリームおよびダウンストリームデータストレージパラメーターなどのパラメーター設定を調整することにより、 Realtime Compute ジョブのパフォーマンスを最適化できます。

概要

次の 3 つのタイプのパラメーターを設定することにより、ジョブのパフォーマンスを最適化できます。

  • アップストリームおよびダウンストリームデータストレージパラメーター。
  • miniBatch などのジョブパラメーター。
  • parallelism、core や heap_memory などのリソースパラメーター。

このトピックでは、前述の 3 つのタイプのパラメーターを設定する方法について説明します。 ジョブのパラメーター設定を変更または追加した後、変更した設定を適用するには、ジョブを [終了] してから [開始] するか、ジョブを [一時停止] してから [再開] する必要があります。 詳細は、「新しい設定の適用」をご参照ください。

パラメーター設定に基づいたアップストリームとダウンストリームのデータストレージの最適化

Realtime Compute では、各データレコードがソーステーブルと結果テーブルの読み取りおよび書き込み操作をトリガーできます。 これによってアップストリームおよびダウンストリームのデータストレージパフォーマンスに多大な課題が生じます。 これらの課題に対処するため、バッチサイズパラメーターを設定して、ソーステーブルから読み取られる、または一度に結果テーブルに書き込まれるデータレコードの数を指定できます。 次の表に、バッチサイズパラメーターをサポートするソーステーブルと結果テーブルを示します。

テーブル パラメーター 説明
DataHub source table batchReadSize 一度に読み取られるデータレコードの数。 オプション。 デフォルト値:10。
DataHub source table batchSize 一度に書き込まれるデータレコードの数。 オプション。 デフォルト値:300。
Log Service source table batchGetSize 一度に読み込まれる LogGroups の数。 オプション。 デフォルト値:10。
AnalyticDB for MySQL V2.0 result table batchSize 一度に書き込まれるデータレコードの数。 オプション。 デフォルト値 :1000。
ApsaraDB for RDS result table batchSize 一度に書き込まれるデータレコードの数。 オプション。 デフォルト値:50。
HybridDB for MySQL result table batchSize 一度に書き込まれるデータレコードの数。 オプション。 デフォルト値 :1000。 推奨最大値:4096。
bufferSize データ重複除去後のバッファサイズ。 このパラメーターは、プライマリキーが定義されている場合にのみ使用できます。 オプション。 推奨最大値:4096。 このパラメーターは、batchSize パラメーターが設定されている場合は必須となります。
上記のパラメーターを対応するデータストレージシステムのデータ定義言語 (DDL) ステートメントの with 句に追加することで、バッチデータの読み取りと書き込みを構成できます。 たとえば、DDL ステートメントの with 句に batchReadSize='<number>' を追加します。

ジョブパラメーター設定に基づいたパフォーマンスの最適化

miniBatch パラメーターは、GROUP BY 演算子のみを最適化するために使用できます。 Flink SQL を使用してストリームデータを処理する場合、Realtime Compute はデータレコードが届くたびに状態データを読み取ります。 これは、大量の入出力 (I/O) リソースを消費します。 miniBatch パラメーターを設定した場合、Realtime Compute は同じキーを持つデータレコードの状態データを 1 回のみ読み取り、出力には最新のデータレコードのみが含まれます。 これにより、状態データを読み取る頻度が減り、データ出力の更新が最小限に抑えられます。 miniBatch パラメーターを次のように設定します。
  • ジョブに新しいパラメーターを追加したら、ジョブを [終了] してから [開始] し、新しい設定を適用します。
  • ジョブのパラメーターを変更した後、ジョブを [一時停止] してから [再開] し、変更した設定を適用します。
# Enable window miniBatch in Realtime Compute V3.2 and later. (By default, window miniBatch is disabled in Realtime Compute V3.2 and later.)
sql.exec.mini-batch.window.enabled=true
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval, in milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Realtime Compute V2.0 or later uses Niagara as the state backend, and uses it to set the lifecycle (in milliseconds) of the state data.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# Realtime Compute V2.0 or later enables micro-batch processing with an interval of 5 seconds.. (You cannot set this parameter when you use a window function.)
blink.microBatch.allowLatencyMs=5000
# The allowed latency for a job.
blink.miniBatch.allowLatencyMs=5000
# Enable miniBatch for the node that joins two streams.
blink.miniBatch.join.enabled=true
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. This feature is enabled by default in Realtime Compute V2.0 and later, but you must manually enable it if you use Realtime Compute V1.6.4.
blink.localAgg.enabled=true
# Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function in Realtime Compute V2.0 and later.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Configure garbage collection for optimization. (You cannot set this parameter when you use a Log Service source table.)
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Set the time zone.
blink.job.timeZone=Asia/Shanghai

リソース構成の最適化

このセクションでは、リソース構成を最適化する方法について説明します。

  1. 問題分析
    1. 次のトポロジーに示すように、タスクノード 2 の入力キューの割合は 100% に達します。 タスクノード 2 のデータは積み上げられ、タスクノード 1 にもプレッシャーをかけます。このとき、出力キューの割合は 100% に達します。
    2. タスクノード 2 をクリックします。
    3. [サブタスクリスト] をクリックして、[キュー] の値が 100 % であるサブタスクを見つけます。
    4. [アクション] 列の [ログの表示] をクリックします。
    5. [TaskExecutor へのリンク] をクリックします。
    6. 表示される [TaskExecutor] タブで、[メトリックグラフ] をクリックして、CPU とメモリの使用状況を確認します。
  2. パフォーマンスの最適化
    1. [開発] ページで、右側のナビゲーションペインの [基本属性] をクリックします。 表示される [基本属性] で [リソース構成] をクリックします。
    2. 表示されるページで、グループまたはグループ内の演算子のパラメーターを変更します。
      • 演算子のパラメーターを変更するには、以下の手順を実行します。
        1. [グループ] ボックスで、右上隅にあるプラス (+) アイコンをクリックします。
        2. ポインターを対象の演算子ボックスの上に移動します。
        3. 演算子名の横にある [編集] アイコンをクリックします。
        4. 表示される [演算子データの変更] ダイアログボックスで、必要に応じてパラメーターを変更し、OK をクリックします。
      • グループ内の複数の演算子のパラメーターを一度に変更するには、以下の手順を実行します。
        1. ポインターを [グループ] ボックスの上に移動します。
        2. [グループ] の横にある [編集] アイコンをクリックします。
        3. 表示される [演算子データの変更] ダイアログボックスで、必要に応じてパラメーターを変更し、OK をクリックします。
    3. [構成] ページで、ポインターを右上隅の [構成] に移動し、ドロップダウンリストで [適用して閉じる] を選択します。
    • グループのリソースパラメーターを変更してもジョブのパフォーマンスが大幅に改善されない場合は、次の手順に従って問題をトラブルシューティングします。
      1. ノードにデータスキューが存在するかどうかを確認します。
      2. GROUP BY、WINDOW、JOIN などの複雑な演算子のサブタスクが正しく実行されているかどうかを確認します。
    • チェーンから演算子を削除するには、次の手順を実行します。
      1. ポインターを対象の演算子の上に移動して、演算子名の横にある [編集] アイコンをクリックします。
      2. 表示される [演算子データ] の [変更] ダイアログボックスで、chainingStrategyHEAD をクリックします。 この演算子の chainingStrategy パラメーターがすでに HEAD に設定されている場合は、次の演算子の chainingStrategy パラメーターを HEAD に設定する必要があります。 次の表は、chainingStrategy パラメーターの有効な値を示しています。
        説明
        ALWAYS 演算子をチェーンに追加します。
        NEVER 元のチェーン戦略を維持します。
        HEAD チェーンから演算子を削除します。
  3. 原則と提案
    • core:heap_memory を 1:4 に設定することを推奨します。これは、各 CPU コアが 4 GB のメモリに対応することを示します。
      • 一つの演算子用のコアの総数 = 並列処理数 × コア数
      • 一つの演算子用のヒープメモリ容量合計 = 並列処理数 × heap_memory 値
      • チェーンの [core] の値は、チェーン内の演算子の最大コア値と同じです。 チェーンのヒープメモリ容量は、チェーン内の演算子のヒープメモリ総容量と同じです。
      たとえば、演算子のコアパラメーターが 1 に設定され、演算子の heap_memory パラメーターが 3 に設定されている場合、システムは 1 CU に 4 GB のメモリをチェーンに割り当てます。 演算子のコアパラメーターが 1 に設定され、演算子の heap_memory パラメーターが 5 に設定されている場合、システムはチェーン用に 5 GB のメモリを持つ 1.25 CU を割り当てます。
    • parallelism
      • ソースノード
        ソースノードの並列処理パラメーターの値は、ソースノードのシャードの数を超えることはできません。
        • ソースノードの数は、アップストリームパーティションの数の倍数です。
        • ソースノードの数は、並列処理パラメーターの値の倍数です。 たとえば、16 個のソースノードが存在する場合、並列処理パラメーターを 16、8、4 など 16 の約数に設定できます。
      • 演算子ノード
        • 1 秒あたりの推定クエリ数 (QPS) に基づいて、演算子ノードの並列処理パラメーターを設定します。
        • QPS が低い場合は、演算子ノードの数をソースノードの並列処理の値に設定します。
        • QPS が高い場合は、演算子ノードの数がソースノードの並列処理の値を超えていることを確認してください。 たとえば、ソースノードの並列処理の値が 16 の場合、演算子ノードの数を 64、128、256 など 16 より大きい値に設定する必要があります。
      • シンクノード
        • シンクノードの並列処理パラメーターを、ダウンストリームパーティションの数の 2〜3 倍の値に設定します。
        • シンクノードの並列処理パラメーターを、ダウンストリームパーティションの数の 3 倍を超える値に設定しないでください。 そうでないと、書き込みタイムアウトまたは障害が発生します。 たとえば、16 個のダウンストリームシンクノードが存在する場合、これらのシンクノードの並列処理パラメーターを 48 以下の値に設定します。
    • core

      このパラメーターは、CU 構成を指定します。 このパラメーターは、実際の CPU 使用率に基づいて設定します。 デフォルト値は 0.1 です。 推奨値は 0.25 です。

    • heap_memory

      このパラメーターは、ヒープメモリ容量を指定します。 デフォルト値は 256 (MB 単位) です。 このパラメーターは、実際のメモリ使用量に基づいて設定します。

    • state_size

      GROUP BY 演算子を使用して、タスクノードの [state_size] パラメーターを設定できます。 [state_size] パラメーターのデフォルト値は 0 です。 GROUP BY 演算子を含むタスクノードの場合、[state_size] パラメーターを 1 に設定することで、システムが演算子が状態データにアクセスするための追加のメモリを割り当てることができます。 それ以外の場合、プロセスが失敗する可能性があります。 undefinedJOIN、OVER、または WINDOW 演算子を持つタスクノードの場合は、[state_size] パラメーターを 1 に設定する必要もあります。

新しい設定の適用

パラメーター設定が完了したら、ジョブを [中断] してから [再開] するか、ジョブを [終了] してから [開始] して、設定を有効にする必要があります。 ジョブを再起動または再開した後、[管理] > [概要] > [頂点トポロジー] を選択して、新しい設定が有効になっているか確認できます。

一部のパラメーター値のみを変更する場合は、ジョブを [終了] してから [開始] する手順は推奨しません。 ジョブが終了すると、ジョブのステータスがクリアされ、ジョブの結果が変わる可能性があります。
  • リソースパラメーター、with 句のパラメーター、またはジョブパラメーターの値を変更した後、ジョブを [中断] してから [再開] できます。
  • Flink SQL ロジックの変更、Flink SQL コードバージョンの変更、with 句へのパラメーターの追加、またはジョブパラメーターの追加後に、ジョブを [終了] して [開始] できます。
  • ジョブを [中断] して [再開] するには、次の手順を実行します。
    1. ジョブの公開 詳細については、「ジョブの公開」をご参照ください。 [リソース設定][手動設定された最新のリソースを使用] に設定します。
    2. [管理] ページで、対象のジョブを見つけて、[アクション] 列の [中断] をクリックします。
    3. [管理] ページで、対象ののジョブを見つけて、[アクション] 列の [再開] をクリックします。
    4. 表示される [再開] ダイアログボックスで、[最新設定で再開] をクリックします。
  • ジョブを [終了] してから [開始] するには、次の手順を実行します。
    1. ジョブを終了します。 詳細については、「ジョブの終了」をご参照ください。
    2. ジョブを開始します。 詳細については、「ジョブの開始」をご参照ください。

パラメーター

  • Global

    isChainingEnabled:チェーンが有効かどうかを示します。 デフォルト値:true。 [デフォルト値を使用]

  • ノード
    パラメーター 説明 変更可否
    id ノードの一意の ID。 ノード ID はシステムで生成されます。 不可
    uid ノードの UID。 UID は演算子 ID を生成するために使用されます。 このパラメーターが未設定の場合、UID はノード ID と同一になります。 不可
    pact ノードのタイプ (データソース、演算子、データシンクなど)。 不可
    name ノード名。 このパラメーターはカスタマイズできます。 可能
    slotSharingGroup サブタスクが同じスロットを共有できるかどうかを指定します。 このパラメーターにはデフォルト値を使用します。 不可
    chainingStrategy 有効値:
    • ALWAYS:演算子をチェーンに追加します。
    • NEVER:元のチェーン戦略のままです。
    • HEAD::チェーンから演算子を削除します。
    可能
    parallelism ノード内の同時並行タスク数 デフォルト値は 1 です。 必要に応じて値を増加させることができます。 可能
    core CPU 量 デフォルト値は 0.1 です。 このパラメーターは、実際の CPU 使用率に基づいて設定します。 推奨値:0.25。 可能
    heap_memory ヒープメモリ容量 (MB 単位)。 デフォルト値は 256。 このパラメーターは、実際のメモリ使用量に基づいて設定します。 可能
    direct_memory Java 仮想マシン (JVM) のオフヒープメモリ (MB 単位)。 デフォルト値は 0 です。 デフォルト値を使用することを推奨します。 可能
    native_memory Java ネイティブインターフェース (JNI) に使用される JVM オフヒープメモリ (MB 単位)。 デフォルト値:0 推奨値は 10 です。 可能
  • Chain
    Flink SQL タスクは、演算子とも呼ばれる多くのノードを含む有向非循環グラフ (DAG) です。 一部のアップストリーム演算子とダウンストリーム演算子は、実行中に組み合わせてチェーンを形成できます。 チェーンの CPU 量は、チェーン内の演算子の最大 CPU 量に等しく、チェーンのメモリ容量は、チェーン内の演算子のメモリ総容量に等しくなります。 演算子チェーンを使用すると、データ転送を大幅に低減できるため、コストを削減できます。
    • 同じ並列性を持つ演算子のみを組み合わせてチェーンを形成できます。
    • GROUP BY 演算子をチェーンに追加することはできません。