ジョブパラメーター、リソースパラメーター、アップストリームおよびダウンストリームデータストレージパラメーターなどのパラメーター設定を調整することにより、 Realtime Compute ジョブのパフォーマンスを最適化できます。
概要
次の 3 つのタイプのパラメーターを設定することにより、ジョブのパフォーマンスを最適化できます。
- アップストリームおよびダウンストリームデータストレージパラメーター。
- miniBatch などのジョブパラメーター。
- parallelism、core や heap_memory などのリソースパラメーター。
このトピックでは、前述の 3 つのタイプのパラメーターを設定する方法について説明します。 ジョブのパラメーター設定を変更または追加した後、変更した設定を適用するには、ジョブを [開始] するか、ジョブを してから [再開] する必要があります。 詳細は、「新しい設定の適用」をご参照ください。
してからパラメーター設定に基づいたアップストリームとダウンストリームのデータストレージの最適化
Realtime Compute では、各データレコードがソーステーブルと結果テーブルの読み取りおよび書き込み操作をトリガーできます。 これによってアップストリームおよびダウンストリームのデータストレージパフォーマンスに多大な課題が生じます。 これらの課題に対処するため、バッチサイズパラメーターを設定して、ソーステーブルから読み取られる、または一度に結果テーブルに書き込まれるデータレコードの数を指定できます。 次の表に、バッチサイズパラメーターをサポートするソーステーブルと結果テーブルを示します。
テーブル | パラメーター | 説明 | 値 |
DataHub source table | batchReadSize | 一度に読み取られるデータレコードの数。 | オプション。 デフォルト値:10。 |
DataHub source table | batchSize | 一度に書き込まれるデータレコードの数。 | オプション。 デフォルト値:300。 |
Log Service source table | batchGetSize | 一度に読み込まれる LogGroups の数。 | オプション。 デフォルト値:10。 |
AnalyticDB for MySQL V2.0 result table | batchSize | 一度に書き込まれるデータレコードの数。 | オプション。 デフォルト値 :1000。 |
ApsaraDB for RDS result table | batchSize | 一度に書き込まれるデータレコードの数。 | オプション。 デフォルト値:50。 |
HybridDB for MySQL result table | batchSize | 一度に書き込まれるデータレコードの数。 | オプション。 デフォルト値 :1000。 推奨最大値:4096。 |
bufferSize | データ重複除去後のバッファサイズ。 このパラメーターは、プライマリキーが定義されている場合にのみ使用できます。 | オプション。 推奨最大値:4096。 このパラメーターは、batchSize パラメーターが設定されている場合は必須となります。 |
batchReadSize='<number>'
を追加します。
ジョブパラメーター設定に基づいたパフォーマンスの最適化
- ジョブに新しいパラメーターを追加したら、ジョブを [開始] し、新しい設定を適用します。 してから
- ジョブのパラメーターを変更した後、ジョブを [再開] し、変更した設定を適用します。 してから
# Enable window miniBatch in Realtime Compute V3.2 and later. (By default, window miniBatch is disabled in Realtime Compute V3.2 and later.)
sql.exec.mini-batch.window.enabled=true
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval, in milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Realtime Compute V2.0 or later uses Niagara as the state backend, and uses it to set the lifecycle (in milliseconds) of the state data.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# Realtime Compute V2.0 or later enables micro-batch processing with an interval of 5 seconds.. (You cannot set this parameter when you use a window function.)
blink.microBatch.allowLatencyMs=5000
# The allowed latency for a job.
blink.miniBatch.allowLatencyMs=5000
# Enable miniBatch for the node that joins two streams.
blink.miniBatch.join.enabled=true
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. This feature is enabled by default in Realtime Compute V2.0 and later, but you must manually enable it if you use Realtime Compute V1.6.4.
blink.localAgg.enabled=true
# Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function in Realtime Compute V2.0 and later.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Configure garbage collection for optimization. (You cannot set this parameter when you use a Log Service source table.)
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Set the time zone.
blink.job.timeZone=Asia/Shanghai
リソース構成の最適化
このセクションでは、リソース構成を最適化する方法について説明します。
- 問題分析
- 次のトポロジーに示すように、タスクノード 2 の入力キューの割合は 100% に達します。 タスクノード 2 のデータは積み上げられ、タスクノード 1 にもプレッシャーをかけます。このとき、出力キューの割合は 100% に達します。
- タスクノード 2 をクリックします。
- [サブタスクリスト] をクリックして、[キュー] の値が
100 %
であるサブタスクを見つけます。 - [アクション] 列の [ログの表示] をクリックします。
- [TaskExecutor へのリンク] をクリックします。
- 表示される [メトリックグラフ] をクリックして、CPU とメモリの使用状況を確認します。 タブで、
- パフォーマンスの最適化
- [開発] ページで、右側のナビゲーションペインの [基本属性] をクリックします。 表示される [基本属性] で [リソース構成] をクリックします。
- 表示されるページで、グループまたはグループ内の演算子のパラメーターを変更します。
- 演算子のパラメーターを変更するには、以下の手順を実行します。
- [グループ] ボックスで、右上隅にあるプラス (+) アイコンをクリックします。
- ポインターを対象の演算子ボックスの上に移動します。
- 演算子名の横にある [編集] アイコンをクリックします。
- 表示される [演算子データの変更] ダイアログボックスで、必要に応じてパラメーターを変更し、OK をクリックします。
- グループ内の複数の演算子のパラメーターを一度に変更するには、以下の手順を実行します。
- ポインターを [グループ] ボックスの上に移動します。
- [グループ] の横にある [編集] アイコンをクリックします。
- 表示される [演算子データの変更] ダイアログボックスで、必要に応じてパラメーターを変更し、OK をクリックします。
- 演算子のパラメーターを変更するには、以下の手順を実行します。
- [構成] ページで、ポインターを右上隅の に移動し、ドロップダウンリストで [適用して閉じる] を選択します。
注- グループのリソースパラメーターを変更してもジョブのパフォーマンスが大幅に改善されない場合は、次の手順に従って問題をトラブルシューティングします。
- ノードにデータスキューが存在するかどうかを確認します。
- GROUP BY、WINDOW、JOIN などの複雑な演算子のサブタスクが正しく実行されているかどうかを確認します。
- チェーンから演算子を削除するには、次の手順を実行します。
- ポインターを対象の演算子の上に移動して、演算子名の横にある [編集] アイコンをクリックします。
- 表示される [演算子データ] の [変更] ダイアログボックスで、chainingStrategy の
HEAD
をクリックします。 この演算子の chainingStrategy パラメーターがすでにHEAD
に設定されている場合は、次の演算子の chainingStrategy パラメーターをHEAD
に設定する必要があります。 次の表は、chainingStrategy パラメーターの有効な値を示しています。値 説明 ALWAYS 演算子をチェーンに追加します。 NEVER 元のチェーン戦略を維持します。 HEAD チェーンから演算子を削除します。
- 原則と提案
-
core:heap_memory
を 1:4 に設定することを推奨します。これは、各 CPU コアが 4 GB のメモリに対応することを示します。注- 一つの演算子用のコアの総数 = 並列処理数 × コア数。
- 一つの演算子用のヒープメモリ容量合計 = 並列処理数 × heap_memory 値。
- チェーンの [core] の値は、チェーン内の演算子の最大コア値と同じです。 チェーンのヒープメモリ容量は、チェーン内の演算子のヒープメモリ総容量と同じです。
- parallelism
- ソースノード
注 ソースノードの並列処理パラメーターの値は、ソースノードのシャードの数を超えることはできません。
- ソースノードの数は、アップストリームパーティションの数の倍数です。
- ソースノードの数は、並列処理パラメーターの値の倍数です。 たとえば、16 個のソースノードが存在する場合、並列処理パラメーターを 16、8、4 など 16 の約数に設定できます。
- 演算子ノード
- 1 秒あたりの推定クエリ数 (QPS) に基づいて、演算子ノードの並列処理パラメーターを設定します。
- QPS が低い場合は、演算子ノードの数をソースノードの並列処理の値に設定します。
- QPS が高い場合は、演算子ノードの数がソースノードの並列処理の値を超えていることを確認してください。 たとえば、ソースノードの並列処理の値が 16 の場合、演算子ノードの数を 64、128、256 など 16 より大きい値に設定する必要があります。
- シンクノード
- シンクノードの並列処理パラメーターを、ダウンストリームパーティションの数の 2〜3 倍の値に設定します。
- シンクノードの並列処理パラメーターを、ダウンストリームパーティションの数の 3 倍を超える値に設定しないでください。 そうでないと、書き込みタイムアウトまたは障害が発生します。 たとえば、16 個のダウンストリームシンクノードが存在する場合、これらのシンクノードの並列処理パラメーターを 48 以下の値に設定します。
- ソースノード
- core
このパラメーターは、CU 構成を指定します。 このパラメーターは、実際の CPU 使用率に基づいて設定します。 デフォルト値は 0.1 です。 推奨値は 0.25 です。
- heap_memory
このパラメーターは、ヒープメモリ容量を指定します。 デフォルト値は 256 (MB 単位) です。 このパラメーターは、実際のメモリ使用量に基づいて設定します。
- state_size
GROUP BY 演算子を使用して、タスクノードの [state_size] パラメーターを設定できます。 [state_size] パラメーターのデフォルト値は 0 です。 GROUP BY 演算子を含むタスクノードの場合、[state_size] パラメーターを
1
に設定することで、システムが演算子が状態データにアクセスするための追加のメモリを割り当てることができます。 それ以外の場合、プロセスが失敗する可能性があります。undefinedJOIN、OVER、または WINDOW 演算子を持つタスクノードの場合は、[state_size] パラメーターを
1
に設定する必要もあります。
-
新しい設定の適用
パラメーター設定が完了したら、ジョブを [再開] するか、ジョブを してから [開始] して、設定を有効にする必要があります。 ジョブを再起動または再開した後、 を選択して、新しい設定が有効になっているか確認できます。
してから- リソースパラメーター、with 句のパラメーター、またはジョブパラメーターの値を変更した後、ジョブを [再開] できます。 してから
- Flink SQL ロジックの変更、Flink SQL コードバージョンの変更、with 句へのパラメーターの追加、またはジョブパラメーターの追加後に、ジョブを [開始] できます。 して
- ジョブを [再開] するには、次の手順を実行します。
- ジョブの公開 詳細については、「ジョブの公開」をご参照ください。 [リソース設定] を [手動設定された最新のリソースを使用] に設定します。
- [管理] ページで、対象のジョブを見つけて、[アクション] 列の [中断] をクリックします。
- [管理] ページで、対象ののジョブを見つけて、[アクション] 列の [再開] をクリックします。
- 表示される [再開] ダイアログボックスで、[最新設定で再開] をクリックします。
して - ジョブを [開始] するには、次の手順を実行します。 してから
パラメーター
- Global
isChainingEnabled:チェーンが有効かどうかを示します。 デフォルト値:true。 [デフォルト値を使用] 。
- ノード
パラメーター 説明 変更可否 id ノードの一意の ID。 ノード ID はシステムで生成されます。 不可 uid ノードの UID。 UID は演算子 ID を生成するために使用されます。 このパラメーターが未設定の場合、UID はノード ID と同一になります。 不可 pact ノードのタイプ (データソース、演算子、データシンクなど)。 不可 name ノード名。 このパラメーターはカスタマイズできます。 可能 slotSharingGroup サブタスクが同じスロットを共有できるかどうかを指定します。 このパラメーターにはデフォルト値を使用します。 不可 chainingStrategy 有効値: - ALWAYS:演算子をチェーンに追加します。
- NEVER:元のチェーン戦略のままです。
- HEAD::チェーンから演算子を削除します。
可能 parallelism ノード内の同時並行タスク数 デフォルト値は 1 です。 必要に応じて値を増加させることができます。 可能 core CPU 量 デフォルト値は 0.1 です。 このパラメーターは、実際の CPU 使用率に基づいて設定します。 推奨値:0.25。 可能 heap_memory ヒープメモリ容量 (MB 単位)。 デフォルト値は 256。 このパラメーターは、実際のメモリ使用量に基づいて設定します。 可能 direct_memory Java 仮想マシン (JVM) のオフヒープメモリ (MB 単位)。 デフォルト値は 0 です。 デフォルト値を使用することを推奨します。 可能 native_memory Java ネイティブインターフェース (JNI) に使用される JVM オフヒープメモリ (MB 単位)。 デフォルト値:0 推奨値は 10 です。 可能 - Chain
Flink SQL タスクは、演算子とも呼ばれる多くのノードを含む有向非循環グラフ (DAG) です。 一部のアップストリーム演算子とダウンストリーム演算子は、実行中に組み合わせてチェーンを形成できます。 チェーンの CPU 量は、チェーン内の演算子の最大 CPU 量に等しく、チェーンのメモリ容量は、チェーン内の演算子のメモリ総容量に等しくなります。 演算子チェーンを使用すると、データ転送を大幅に低減できるため、コストを削減できます。注
- 同じ並列性を持つ演算子のみを組み合わせてチェーンを形成できます。
- GROUP BY 演算子をチェーンに追加することはできません。