Realtime Compute for Apache Flink では、各データストリームを外部データソースのディメンションテーブルに関連付けることができます。これにより、Realtime Compute for Apache Flink で関連クエリを実行できます。
背景情報
ほとんどのコネクタでは、ディメンションテーブルでの JOIN 操作に対するキャッシュポリシーを指定できます。コネクタによってサポートされるキャッシュポリシーは異なります。詳細については、関連するコネクタのドキュメントをご参照ください。次のキャッシュポリシーがサポートされています:
None:データはキャッシュされません。これがデフォルト値です。
LRU:ディメンションテーブル内の特定のデータのみがキャッシュされます。システムがデータレコードを受信するたびに、システムはキャッシュを検索します。キャッシュ内でレコードが見つからない場合、システムは物理ディメンションテーブルでデータレコードを検索します。
ALL:ディメンションテーブルのすべてのデータがキャッシュされます。デプロイメントが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。これにより、後続のすべてのディメンションテーブルクエリでキャッシュが検索されます。要件を満たすデータがキャッシュで見つからない場合、そのキーは存在しないと見なされます。キャッシュエントリが有効期限切れになると、システムはキャッシュ内のすべてのデータを再読み込みします。リモートテーブルのデータ量が少なく、存在しないキーが多い場合は、このパラメーターを ALL に設定することを推奨します。ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることはできません。
必要に応じて、リアルタイム性能とデータ処理性能のバランスを考慮する必要があります。データをリアルタイムで更新したい場合は、コネクタがディメンションテーブルから直接データを読み取ることを許可できます。これにより、キャッシュデータを使用する必要がなくなります。
キャッシュポリシーを使用したい場合は、キャッシュポリシーを LRU に設定し、生存時間 (TTL) を指定して最新のデータをキャッシュできます。TTL は、数秒から数十秒などの小さい値に設定できます。これにより、指定された間隔でソーステーブルからデータをロードできます。
キャッシュポリシーが ALL の場合、メモリ不足 (OOM) エラーを防ぐために、演算子のメモリ使用量をモニターする必要があります。
キャッシュポリシーが ALL の場合、システムがディメンションテーブルから非同期でデータをロードするため、テーブルを結合する演算子のメモリを増やす必要があります。増加させるメモリサイズは、リモートテーブルの 2 倍です。
制限事項
データストリームは、現在の時点で取得されたディメンションテーブルのスナップショットとのみ関連付けることができます。
ディメンションテーブルは INNER JOIN および LEFT JOIN 操作をサポートしますが、RIGHT JOIN または FULL JOIN 操作はサポートしません。
注意事項
1 対 1 のテーブル結合を実行する場合は、結合条件にディメンションテーブル内の一意のフィールドを含む等結合が含まれていることを確認してください。
各データストリームは、現在の時刻におけるディメンションテーブルの最新データとのみ関連付けられます。これは、JOIN 操作が処理時間でのみ実行されることを意味します。したがって、JOIN 操作の実行後にディメンションテーブルのデータが追加、更新、または削除されても、関連付けられたデータは変更されません。特定のディメンションテーブルの動作の詳細については、「サポートされているコネクタ」をご参照ください。
ディメンションテーブルの JOIN 構文
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;ディメンションテーブルの末尾に `FOR SYSTEM_TIME AS OF PROCTIME()` を追加する必要があります。これにより、現在の時刻で表示可能なディメンションテーブルの各データレコードがソースデータと関連付けられます。
ON 条件には、ディメンションテーブル内でランダムに検索できるフィールドの等価条件が含まれている必要があります。
ON 句で指定された結合条件では、ディメンションテーブルのフィールドで `CAST` などの型変換関数を使用することはできません。データの型を変換したい場合は、ソーステーブルのフィールドで変換を実行してください。
ディメンションテーブルの結合ヒント
ディメンションテーブルのヒントを使用して、ディメンションテーブルの結合戦略を設定できます。ヒント機能の詳細については、「Flink SQL ヒント」をご参照ください。ディメンションテーブルのヒントには、LOOKUP ヒントやその他の結合ヒントが含まれます。
Ververica Runtime (VVR) 8.0 以降を使用する Realtime Compute for Apache Flink のみが LOOKUP ヒントをサポートします。
VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink のみで、LOOKUP ヒントを使用してシャッフル戦略を設定できます。
VVR 8.0 以降を使用する Realtime Compute for Apache Flink では、ディメンションテーブルの結合ヒントでエイリアスを指定できます。ディメンションテーブルにエイリアスが指定されている場合、結合ヒントではそのディメンションテーブルのエイリアスを使用する必要があります。
VVR 4.0 以降を使用する Realtime Compute for Apache Flink のみが、その他の結合ヒントをサポートします。
LOOKUP ヒント
Realtime Compute for Apache Flink の LOOKUP ヒント機能は、オープンソースコミュニティが提供する LOOKUP ヒント機能と一貫しています。ディメンションテーブルに対して、同期、非同期、およびリトライのルックアップ戦略を設定できます。詳細については、「LOOKUP ヒント」をご参照ください。VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink では、LOOKUP ヒント機能が拡張され、'shuffle' = 'true' を設定できるようになりました。これにより、ディメンションテーブルでの JOIN 操作にシャッフル戦略を指定できます。次の表に、さまざまなシナリオでのシャッフル戦略を示します。
シナリオ | 参加ポリシー |
`'shuffle' = 'true'` オプションが設定されていない。 | エンジンのデフォルトのシャッフル戦略が使用されます。 |
`'shuffle' = 'true'` オプションが設定されておらず、ディメンションテーブルのコネクタがカスタム結合ポリシーを提供していない。 | |
`'shuffle' = 'true'` オプションが設定されており、ディメンションテーブルのコネクタがカスタム結合ポリシーを提供していない。 | デフォルトでは、SHUFFLE_HASH 戦略が使用されます。詳細については、「SHUFFLE_HASH」をご参照ください。 |
`'shuffle' = 'true'` オプションが設定されており、ディメンションテーブルのコネクタがカスタム結合ポリシーを提供している。 | ディメンションテーブルのコネクタのカスタムシャッフル戦略が使用されます。 |
Streaming data lakehouse Paimon のみがカスタムシャッフル戦略を提供します。ディメンションテーブルの結合列にすべてのバケットフィールドが含まれている場合、ディメンションテーブルはバケットに基づいてシャッフルされます。
次のサンプルコードは、ディメンションテーブルで JOIN 操作を実行する際にシャッフル戦略を設定する方法の例です:
-- dim1 ディメンションテーブルのみでディメンションテーブル結合のシャッフルポリシーを設定します。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- dim1 と dim2 の両方のディメンションテーブルでディメンションテーブル結合のシャッフルポリシーを設定します。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- エイリアス D1 を使用して、dim1 ディメンションテーブルでのディメンションテーブル結合のシャッフルポリシーを設定する必要があります。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- エイリアスを使用して、dim1 と dim2 の両方のディメンションテーブルでディメンションテーブル結合のシャッフルポリシーを設定します。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.bその他の結合ヒント
ディメンションテーブルのその他の結合ヒントは、SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH、SKEW を含むディメンションテーブルの結合戦略を設定するためにのみ使用されます。次の表は、ディメンションテーブルのキャッシュポリシーの設定に基づいた結合戦略の使用シナリオを示しています。
キャッシュポリシー | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (SKEW と同等) |
None | この結合戦略の使用は推奨されません。この結合戦略を使用すると、主流データに追加のネットワークオーバーヘッドが発生します。 | この結合戦略の使用は推奨されません。この結合戦略を使用すると、主流データに追加のネットワークオーバーヘッドが発生します。 |
LRU | ディメンションテーブルのルックアップ I/O がボトルネックになる場合は、この結合戦略を使用することを推奨します。主流データが結合キーに対して時間的局所性を持つ場合、この結合戦略はキャッシュヒット率を向上させ、I/O リクエストの数を減らすことができます。これにより、全体のスループットが向上します。 重要 主流データに追加のネットワークオーバーヘッドが発生します。主流データが結合キーでスキューしており、パフォーマンスボトルネックが存在する場合は、REPLICATED_SHUFFLE_HASH 結合戦略を使用することを推奨します。 | ディメンションテーブルのルックアップ I/O がボトルネックになり、主流データが結合キーでスキューしている場合は、この結合戦略を使用することを推奨します。主流データが結合キーに対して時間的局所性を持つ場合、この結合戦略はキャッシュヒット率を向上させ、I/O リクエストの数を減らすことができます。これにより、全体のスループットが向上します。 |
ALL | ディメンションテーブルのメモリ使用量がボトルネックになる場合は、この結合戦略を使用することを推奨します。これにより、メモリ使用量を 1/並列度 の値まで削減できます。 重要 主流データに追加のネットワークオーバーヘッドが発生します。主流データが結合キーでスキューしており、パフォーマンスボトルネックが存在する場合は、REPLICATED_SHUFFLE_HASH 結合戦略を使用することを推奨します。 | ディメンションテーブルのメモリ使用量がボトルネックになり、主流データが結合キーでスキューしている場合は、この結合戦略を使用することを推奨します。これにより、メモリ使用量を バケット数/並列度 の値まで削減できます。 |
SHUFFLE_HASH
効果
SHUFFLE_HASH 結合戦略では、JOIN 操作が実行される前に、主流データを結合キーに基づいてシャッフルできます。キャッシュポリシーが LRU の場合、キャッシュヒット率が向上し、I/O リクエストの数が減少します。キャッシュポリシーが ALL の場合、メモリ使用量が削減されます。各 SHUFFLE_HASH 結合ヒントで複数のディメンションテーブルを指定できます。
制限事項
SHUFFLE_HASH 結合戦略を使用すると、メモリのオーバーヘッドは削減されます。しかし、上流のデータを結合キーに基づいてシャッフルする必要があるため、追加のネットワークオーバーヘッドが発生します。したがって、SHUFFLE_HASH 結合戦略は次のシナリオには適していません:
主流データの結合キーに深刻なデータスキューがあります。SHUFFLE_HASH 結合戦略を使用してデータを結合すると、結合演算子がデータスキューのためにパフォーマンスボトルネックを引き起こす可能性があります。これにより、ストリーミングデプロイメントでは深刻なバックプレッシャーが、バッチデプロイメントでは深刻なロングテールが発生する可能性があります。このシナリオでは、REPLICATED_SHUFFLE_HASH 結合戦略を使用することを推奨します。
ディメンションテーブルのデータ量が少なく、キャッシュポリシーが ALL の場合にテーブルのロード中にメモリボトルネックが発生しない場合、SHUFFLE_HASH 結合戦略で節約されるメモリオーバーヘッドは、SHUFFLE_HASH 結合戦略を使用することで発生する追加のネットワークオーバーヘッドと比較して、費用対効果が高くない可能性があります。
サンプルコード
-- dim1 ディメンションテーブルに対してのみ SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- dim1 と dim2 の両方のディメンションテーブルに対して SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- ヒントで dim1 ディメンションテーブルのエイリアス D1 を使用して、SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- ヒントで dim1 と dim2 のディメンションテーブルのエイリアスを使用して、SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
効果
REPLICATED_SHUFFLE_HASH の効果は、基本的に SHUFFLE_HASH の効果と同じです。ただし、REPLICATED_SHUFFLE_HASH は、同じキーを持つ主流データを指定された数の並行スレッドにランダムに分散させることで、データスキューによるパフォーマンスボトルネックを解決します。各 REPLICATED_SHUFFLE_HASH 結合ヒントで複数のディメンションテーブルを指定できます。
制限事項
table.exec.skew-join.replicate-numパラメーターを設定して、スキューデータを含むバケットの数を指定する必要があります。このパラメーターのデフォルト値は 16 です。このパラメーターの値は、ディメンションテーブルの結合演算子の並行スレッド数より大きくすることはできません。このパラメーターの設定方法の詳細については、「ジョブのカスタム実行パラメーターを設定する方法」をご参照ください。更新ストリームはサポートされていません。主流が更新ストリームであり、REPLICATED_SHUFFLE_HASH 結合戦略を使用すると、エラーが返されます。
サンプルコード
-- dim1 ディメンションテーブルに対して REPLICATED_SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- ヒントで dim1 ディメンションテーブルのエイリアスを使用して、REPLICATED_SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
効果
指定されたテーブルにデータスキューがある場合、オプティマイザーはディメンションテーブルの JOIN 操作に REPLICATED_SHUFFLE_HASH 結合戦略を使用します。SKEW は単なる糖衣構文であり、実際には下位レイヤーで REPLICATED_SHUFFLE_HASH 結合戦略が使用されます。
制限事項
各 SKEW ヒントでは、1 つのテーブルのみを指定できます。
テーブル名は、ディメンションテーブルではなく、データスキューのあるプライマリテーブルの名前でなければなりません。
更新ストリームはサポートされていません。主流が更新ストリームであり、SKEW 結合戦略を使用すると、エラーが返されます。
サンプルコード
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LOOKUP ヒントのシャッフル戦略は、SHUFFLE_HASH ヒントの機能を提供します。両方のタイプのヒントを使用する場合、LOOKUP ヒントのシャッフル戦略が SHUFFLE_HASH ヒントよりも優先されます。
LOOKUP ヒントのシャッフル戦略では、データスキューの問題を解決できません。LOOKUP ヒントを REPLICATED_SHUFFLE_HASH または SKEW ヒントと一緒に使用する場合、REPLICATED_SHUFFLE_HASH または SKEW ヒントのシャッフル戦略が LOOKUP ヒントよりも優先されます。
例
テストデータ
テーブル 1 kafka_input
id (bigint)
name (varchar)
age (bigint)
1
りれい
22
2
はんめいめい
20
3
りばい
28
テーブル 2 phoneNumber
name (varchar)
phoneNumber (bigint)
杜甫
1390000111
白居易
1390000222
李白
1390000333
李雷
1390000444
テスト文
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;テスト結果
id (bigint)
phoneNumber (bigint)
name (varchar)
1
1390000444
lilei
3
1390000333
libai