このトピックでは、SQL ドラフトを開発する際に構成が必要となる可能性のある主要なパラメーターについて説明します。また、パラメーターの構成方法の例も示します。
table.exec.sink.keyed-shuffle
table.exec.sink.keyed-shuffle パラメーターは、プライマリキーを持つテーブルに書き込まれるデータの順序の乱れを解決します。これを有効にすると、ジョブでハッシュシャッフルを実行できます。これにより、同じプライマリキーを持つデータが同じオペレータータスクにルーティングされ、 disorder 問題が発生する可能性が低くなります。
使用上の注意
ハッシュシャッフルは、アップストリームオペレーターがプライマリキーフィールドの更新レコードの有効な順序を保証できる場合にのみ役立ちます。
エキスパートモードで実行されるジョブのオペレーターの並列度を変更した場合、次の並列度ルールは適用されません。
有効な値
AUTO(デフォルト): シンクオペレーターの並列度が 1 ではなく、アップストリームオペレーターの並列度と異なる場合、Realtime Compute for Apache Flink は、データがシンクオペレーターに流れるときに、プライマリキーフィールドでハッシュシャッフルを自動的に実行します。FORCE: シンクオペレーターの並列度が 1 ではない場合、Realtime Compute for Apache Flink は、データがシンクオペレーターに流れるときに、プライマリキーフィールドでハッシュシャッフルを強制的に実行します。NONE: Realtime Compute for Apache Flink は、シンクオペレーターの並列度とアップストリームオペレーターの並列度に基づいてハッシュシャッフルを実行しません。
例
AUTO
コードをコピーして SQL ストリーミングドラフトに貼り付け、ドラフトをデプロイします。シンクの並列度を 2 に明示的に設定します。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', -- シンクオペレーターの並列度を 2 に設定します。 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; -- 動的テーブルオプションを構成して、シンクオペレーターの並列度を指定することもできます。 --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;自動ハッシュシャッフルを有効にします。
ジョブ詳細ページに移動します。
[構成] タブで、[リソース] セクションを見つけ、[編集] をクリックします。
[並列度] を 1 に設定します。

[パラメーター] セクションで、[編集] をクリックします。
[その他の構成] フィールドで、
table.exec.sink.keyed-shuffle: AUTOを明示的に設定します。または、フィールドを空のままにします。
ジョブを開始します。
[ステータス] タブで、シンクオペレーターとアップストリームオペレーター間のデータ接続モードは HASH です。

FORCE
コードをコピーして SQL ストリーミングドラフトに貼り付け、ドラフトをデプロイします。シンクの並列度を明示的に設定しないでください。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;FORCEハッシュシャッフルを有効にします。ジョブ詳細ページに移動します。
[構成] タブで、[リソース] セクションを見つけ、[編集] をクリックし、[並列度] を 2 に設定します。
[パラメーター] セクションで、[編集] をクリックします。
[その他の構成] フィールドに、
table.exec.sink.keyed-shuffle: FORCEを追加します。
ジョブを開始します。
[ステータス] タブで、シンクオペレーターとそのアップストリームオペレーターの両方の並列度は 2 で、データ接続モードは HASH です。

table.exec.mini-batch.size
このパラメーターは、マイクロバッチ操作のために計算オペレーターにキャッシュできる入力レコードの最大数を指定します。キャッシュされたデータレコードの数が MiniBatch サイズに達すると、計算とデータ出力がトリガーされます。このパラメーターは、table.exec.mini-batch.enabled パラメーターおよび table.exec.mini-batch.allow-latency パラメーターと一緒に使用する場合にのみ有効になります。 miniBatch 関連の最適化の詳細については、「MiniBatch Aggregation」および「MiniBatch Regular Joins」をご参照ください。
使用上の注意
ジョブが開始される前に [パラメーター] セクションでこのパラメーターを明示的に構成しない場合、miniBatch 処理モードでは管理メモリを使用してデータがキャッシュされます。次のいずれかの条件が満たされると、最終的な計算とデータ出力がトリガーされます。
計算オペレーターが
MiniBatchAssignerオペレーターから送信されたウォーターマークメッセージを受信する。管理メモリがいっぱいになる。
チェックポイントの前に。
ジョブがキャンセルされる。
有効な値
-1 (デフォルト): 管理メモリを使用してデータがキャッシュされます。
LONG 型の負の値:処理メカニズムは、デフォルト値の処理メカニズムと同じです。
LONG 型の正の値:ヒープメモリを使用してデータがキャッシュされます。キャッシュされた入力レコード数が指定された値に達すると、データ出力がトリガーされます。
例
SQL ストリーミングドラフトを作成し、次のテスト SQL 文をコピーして、ドラフトをデプロイします。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;MiniBatch サイズを構成します。
ジョブ詳細ページに移動します。
[構成] タブで、[パラメーター] セクションを見つけ、[編集] をクリックします。
[その他の構成] で、
table.exec.mini-batch.enabled: trueおよびtable.exec.mini-batch.allow-latency: 2sを設定します。table.exec.mini-batch.sizeを-1に設定するか、このパラメーターをスキップします。
ジョブを開始します。
[ステータス] タブで、ジョブのトポロジーには
MiniBatchAssigner、LocalGroupAggregate、およびGlobalGroupAggregateオペレーターが含まれています。
table.exec.agg.mini-batch.output-identical-enabled
このパラメーターは、状態 TTL が有効で、MinibatchGlobalAgg オペレーターと MinibatchAgg オペレーターによってデータが消費された後も集約結果が変更されない場合に、重複データをダウンストリームオペレーターに送信するかどうかを指定します。デフォルトでは、重複データは送信されません。ただし、この動作により、長期間データが受信されないため、ダウンストリームオペレーターの状態データの有効期限が切れる可能性があります。MinibatchGlobalAgg オペレーターと MinibatchAgg オペレーターに対して、このパラメーターを true に設定できます。ジョブの集約結果の変更期間が指定された状態 TTL よりも短い場合は、この設定をスキップします。詳細については、「FLINK-33936」をご参照ください。
使用上の注意
このスイッチは、Ververica Runtime (VVR) 8.0.8 以降でのみ有効です。
値を
falseからtrueに変更すると、MinibatchGlobalAggオペレーターとMinibatchAggオペレーターによって送信されるデータ量が増加する可能性があります。これにより、ダウンストリームオペレーターに負荷がかかる可能性があります。
有効な値
false(デフォルト): 状態データの TTL が有効で、MinibatchGlobalAggオペレーターとMinibatchAggオペレーターによってデータが消費された後も集約結果が変更されない場合、重複データはダウンストリームオペレーターに送信されません。true: 状態データの TTL が有効で、MinibatchGlobalAggオペレーターとMinibatchAggオペレーターによってデータが消費された後も集約結果が変更されない場合、重複データはダウンストリームオペレーターに送信されます。
例
SQL ストリーミングドラフトを作成し、次のテスト SQL 文をコピーして、ドラフトをデプロイします。
create temporary table src( a int, b string ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.a.min' = '1', 'fields.a.max' = '1', 'fields.b.length' = '3' ); create temporary table snk( a int, max_length_b bigint ) with ( 'connector' = 'blackhole' ); insert into snk select a, max(CHAR_LENGTH(b)) from src group by a;MiniBatch 集約最適化を有効にします。
ジョブ詳細ページに移動します。
[構成] タブで、[パラメーター] セクションを見つけ、[編集] をクリックします。
[その他の構成] に、
table.exec.mini-batch.enabled: trueおよびtable.exec.mini-batch.allow-latency: 2sを追加します。
ジョブを開始します。
[ステータス] タブに、
MinibatchGlobalAggregateオペレーターが表示されます。オペレーターの + をクリックして、集約結果が変更されない場合、GlobalGroupAggregateオペレーターがダウンストリームオペレーターにデータを送信していないことを確認します。
ジョブを停止します。[構成] タブで、[パラメーター] セクションを見つけ、[編集] をクリックします。[その他の構成] に、
table.exec.agg.mini-batch.output-identical-enabled: trueを追加します。ジョブを開始します。
[ステータス] タブに、
MinibatchGlobalAggregateオペレーターが表示されます。オペレーターの + をクリックして、集約結果が変更されない場合、GlobalGroupAggregateオペレーターがダウンストリームオペレーターにデータを送信することを確認します。
table.exec.async-lookup.key-ordered-enabled
ルックアップ結合を含むユースケースでは、非同期モードを有効にしてスループットを向上させることができます。次の表は、ルックアップ結合における table.exec.async-lookup.output-mode パラメーターの設定と、入力ストリームが更新ストリームであるかどうかに基づいた、非同期 I/O 操作のデータ順序を示しています。
| 更新ストリーム | 非更新ストリーム |
ORDERED | 順序付きモード | 順序付きモード |
ALLOW_UNORDERED | 順序付きモード | 順序なしモード |
更新ストリームに対して table.exec.async-lookup.output-mode が ALLOW_UNORDERED に設定されている場合、データの正確性は順序付きモードによって保証されますが、スループットは低下します。この問題を解決するために、table.exec.async-lookup.key-ordered-enabled パラメーターが導入され、更新ストリームのデータの正確性と非同期 I/O 操作のスループットの両方が保証されます。ストリーム内の同じ更新キー (変更ログのプライマリキーと見なされる) を持つメッセージは、メッセージがオペレーターに入る順序に基づいて処理されます。
順序付きモード: このモードでは、ストリームの順序は変更されません。結果メッセージが送信される順序は、非同期リクエストがトリガーされる順序 (メッセージがオペレーターに入る順序) と同じです。
順序なしモード: このモードでは、非同期リクエストが完了するとすぐに結果メッセージが送信されます。ストリーム内のメッセージの順序は、メッセージが非同期 I/O オペレーターによって処理された後に変更されます。詳細については、「Async I/O | Apache Flink」をご参照ください。
シナリオ
一定期間にわたってストリーム内の同じ更新キーを持つメッセージの数が少ない。たとえば、更新キーはプライマリキーと見なされ、同じプライマリキーを持つデータは頻繁に更新されません。さらに、ディメンションテーブルが結合される場合、更新キーに基づく処理順序が必要です。この場合、table.exec.async-lookup.key-ordered-enabled パラメーターを指定することで最適化を実行できます。これにより、更新キーに基づいたデータ処理順序を確保できます。
プライマリキーを含む Change Data Capture (CDC) ストリームでは、ディメンションテーブルが結合されて、シンクへのデータ書き込み用のワイドテーブルが生成されます。シンクのプライマリキーは、ソースのプライマリキーと一致しています。さらに、ディメンションテーブルの JOIN 操作の結合キーは、プライマリキーと一致しません。結合キーはプライマリキーと見なされます。この場合、table.exec.async-lookup.key-ordered-enabled パラメーターを指定することで最適化を実行できます。これにより、システムは更新キーと見なされる CDC プライマリキーに基づいてデータをシャッフルできます。また、SHUFFLE_HASH 結合ポリシーを有効にして、このシナリオを最適化することもできます。高並列シナリオでは、この方法と比較して、table.exec.async-lookup.key-ordered-enabled パラメーターを指定することで最適化を実行すると、データがシンクに書き込まれる前に SinkMaterializer オペレーターが生成されるのを防ぐことができます。これにより、オペレーターによって引き起こされる可能性のあるパフォーマンスの問題、特にオペレーターの長時間実行中に大量の状態データが生成される問題を防ぐことができます。SinkUpsertMaterializer オペレーターの詳細については、「使用上の注意」をご参照ください。
ディメンションテーブルの JOIN 操作の結合キーは、プライマリキーと一致しません。ディメンションテーブルの結合キーはプライマリキーと見なされ、JOIN 操作の実行後にランクオペレーターを使用できます。この場合、table.exec.async-lookup.key-ordered-enabled パラメーターを指定することで最適化を実行すると、システムは更新キーと見なされる CDC プライマリキーに基づいてデータをシャッフルできます。また、SHUFFLE_HASH 結合ポリシーを有効にして、このシナリオを最適化することもできます。この方法と比較して、table.exec.async-lookup.key-ordered-enabled パラメーターを指定することで最適化を実行すると、UpdateFastRank が RetractRank にダウングレードするのを防ぐことができます。RetractRank を UpdateFastRank に変更する方法の詳細については、「TopN の実践」をご参照ください。
使用上の注意
ストリームに更新キーがない場合、データの行全体がキーとして使用されます。
短期間に同じ更新キーが頻繁に更新されると、スループットが低下します。これは、同じ更新キーを持つデータが厳密な順序で処理されるためです。
最適化前の非同期モードでのディメンションテーブルの JOIN 操作と比較して、キー順序付きモードはキー付き状態を提供します。キー順序付きモードを有効または無効にすると、状態データの互換性に影響します。
この最適化は、VVR 8.0.10 以降で
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'およびtable.exec.async-lookup.key-ordered-enabled='true'構成をディメンションテーブルの JOIN 操作に追加し、入力ストリームが更新ストリームである場合にのみ有効になります。
有効な値
false(デフォルト): キー順序モードを無効にします。true: キー順序モードを有効にします。
例
次のコードをストリーミングドラフトにコピーして貼り付け、デプロイします。
create TEMPORARY table bid_source( auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR, proc_time as proctime(), WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND ) with ( 'connector' = 'kafka', -- 非 insert-only ストリームコネクター。 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE users ( user_id STRING PRIMARY KEY NOT ENFORCED, -- プライマリキーを定義します。 user_name VARCHAR(255) NOT NULL, age INT NOT NULL ) WITH ( 'connector' = 'hologres', -- 非同期ルックアップ機能をサポートするコネクター。 'async' = 'true', 'dbname' = 'holo db name', -- Hologres データベースの名前。 'tablename' = 'schema_name.table_name', -- データを受信するために使用される Hologres テーブルの名前。 'username' = 'access id', -- Alibaba Cloud アカウントの AccessKey ID。 'password' = 'access key', -- Alibaba Cloud アカウントの AccessKey シークレット。 'endpoint' = 'holo vpc endpoint', -- Hologres インスタンスの仮想プライベートクラウド (VPC) エンドポイント。 ); CREATE TEMPORARY TABLE bh ( auction BIGINT, age int ) WITH ( 'connector' = 'blackhole' ); insert into bh SELECT bid_source.auction, u.age FROM bid_source JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u ON bid_source.channel = u.user_id;ジョブ詳細ページに移動します。[構成] タブで、[パラメーター] セクションを見つけ、[編集] をクリックします。[その他の構成] に、
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'およびtable.exec.async-lookup.key-ordered-enabled='true'を追加します。ジョブを開始します。
[ステータス] タブで、ジョブの async 属性に KEY_ORDERED:true が表示されます。

table.optimizer.window-join-enabled
このパラメーターは、ウィンドウ結合 を有効にするかどうかを制御します。有効にすると、Flink はウィンドウ結合アプローチに基づいて実行計画を最適化します。小さなウィンドウの場合、これにより状態サイズを削減し、パフォーマンスを向上させることができます。さらに、通常の結合 とは異なり、ウィンドウ結合を有効にすると、更新メッセージがダウンストリームオペレーターに送信されるのを回避できます。これは、小さなウィンドウでの結合が必要なユースケースに適しています。
ウィンドウ結合と通常の結合
ウィンドウ結合には、追加の構文 制限 があります。さらに、更新ストリームはサポートされていません。
ウィンドウ結合は通常、待ち時間が長くなります。実際の待ち時間は、ウィンドウサイズとウォーターマークの進行速度によって異なります。
イベントタイムウィンドウ結合を有効にした後、遅延データは破棄されます。通常の結合では、遅延データは破棄されません。
ウィンドウ結合を有効または無効にした後、既存のチェックポイントから回復することはできません。これは、ウィンドウ結合と通常の結合の基になる状態構造に互換性がないためです。
有効な値
false(デフォルト値): ウィンドウ結合ステートメントは、実行のために 通常の結合 に変換されます。true: 対応するステートメントは、実行のためにウィンドウ結合に変換されます。
例
SQL ストリーミングドラフトを作成し、次のコードを実行します。ここでは、
table.optimizer.window-join-enabledをtrueに、SET文を介して直接設定します:SET 'table.optimizer.window-join-enabled' = 'true'; CREATE TEMPORARY TABLE LeftTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE RightTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS ) WITH ( 'connector'='datagen' ); EXPLAIN SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end FROM ( SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;最適化された実行計画で
WindowJoinオペレーターを確認できます。== Optimized Execution Plan == Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[(num = num0)], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0]) :- Exchange(distribution=[hash[num]]) : +- Calc(select=[id, num, window_start, window_end]) : +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) : +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num]) +- Exchange(distribution=[hash[num]]) +- Calc(select=[id, num, window_start, window_end]) +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)]) +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])SET文のtable.optimizer.window-join-enabledの値をfalseに変更し、コードを実行します:-- 'false' に設定するか、この設定句を削除します SET 'table.optimizer.window-join-enabled' = 'false'; CREATE TEMPORARY TABLE LeftTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE RightTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS ) WITH ( 'connector'='datagen' ); EXPLAIN SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end FROM ( SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;通常の
JoinオペレーターがWindowJoinオペレーターに置き換わりました。== Optimized Execution Plan == Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end]) +- Join(joinType=[InnerJoin], where=[((num = num0) AND (window_start = window_start0) AND (window_end = window_end0))], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[num, window_start, window_end]]) : +- Calc(select=[id, num, window_start, window_end]) : +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) : +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num]) +- Exchange(distribution=[hash[num, window_start, window_end]]) +- Calc(select=[id, num, window_start, window_end]) +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)]) +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])
参照
LocalGroupAggregate オペレーターでデータ出力が長時間停止しています。データ出力が生成されません。なぜですか?