Realtime Compute for Apache Flink では、各データストリームを外部データソースのディメンションテーブルに関連付けることができます。これにより、Realtime Compute for Apache Flink で関連クエリを実行できます。
背景情報
ほとんどのコネクタでは、ディメンションテーブルに対する JOIN 操作のキャッシュポリシーを指定できます。コネクタによって、サポートされるキャッシュポリシーは異なります。詳細については、関連するコネクタのドキュメントをご参照ください。以下のキャッシュポリシーがサポートされています。
なし: データはキャッシュされません。これはデフォルト値です。
LRU: ディメンションテーブルの特定のデータのみがキャッシュされます。システムがデータレコードを受信するたびに、システムはキャッシュを検索します。システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。
すべて: ディメンションテーブルのすべてのデータがキャッシュされます。デプロイメントが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。このようにして、キャッシュはディメンションテーブルの後続のすべてのクエリで検索されます。要件を満たすデータがキャッシュに見つからない場合、キーは存在しません。キャッシュエントリが期限切れになると、システムはキャッシュ内のすべてのデータをリロードします。リモートテーブルのデータ量が少なく、欠落しているキーの数が多い場合は、このパラメータを「すべて」に設定することをお勧めします。ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることはできません。
ビジネス要件に基づいて、リアルタイムパフォーマンスとデータ処理パフォーマンスのバランスを考慮する必要があります。データをリアルタイムで更新する場合は、キャッシュデータを使用する必要なく、コネクタがディメンションテーブルから直接データを読み取ることを許可できます。
キャッシュポリシーを使用する場合は、キャッシュポリシーを LRU に設定し、有効期間 (TTL) を指定して最新のデータをキャッシュできます。TTL は、数秒から数十秒などの小さい値に設定できます。このようにして、指定された間隔でソーステーブルからデータをロードできます。
キャッシュポリシーが「すべて」の場合、メモリ不足 (OOM) エラーを防ぐために、オペレータのメモリ使用量を監視する必要があります。
キャッシュポリシーが「すべて」の場合、システムはディメンションテーブルから非同期でデータをロードするため、テーブルを結合するためのオペレータのメモリを増やす必要があります。増加したメモリサイズは、リモートテーブルの 2 倍です。
制限事項
データストリームは、現時点で取得されたディメンションテーブルのスナップショットのみに関連付けることができます。
ディメンションテーブルは INNER JOIN 操作と LEFT JOIN 操作をサポートしますが、RIGHT JOIN 操作と FULL JOIN 操作はサポートしません。
注意事項
1 対 1 のテーブル結合を実行する場合は、結合条件にディメンションテーブルの一意のフィールドを含む等結合が含まれていることを確認してください。
各データストリームは、現在の時点でディメンションテーブルの最新のデータのみに関連付けられます。これは、JOIN 操作が処理時のみ実行されることを意味します。したがって、JOIN 操作の実行後にディメンションテーブルのデータが追加、更新、または削除された場合、関連付けられたデータは変更されません。特定のディメンションテーブルの動作の詳細については、サポートされているコネクタをご参照ください。
構文
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;ディメンションテーブルの末尾に FOR SYSTEM_TIME AS OF PROCTIME() を追加する必要があります。このようにして、現時点で表示できるディメンションテーブルの各データレコードは、ソースデータに関連付けられます。
ON 条件には、ディメンションテーブルでランダムに検索できるフィールドの等価条件が含まれている必要があります。
ON 句で指定された結合条件では、ディメンションテーブルのフィールドで CAST などの型変換関数を使用できません。データ型を変換する場合は、ソーステーブルのフィールドで変換を実行します。
ディメンションテーブルの結合ヒント
ディメンションテーブルの結合ヒントを使用して、結合戦略を指定できます。ヒント機能の詳細については、Flink の SQL ヒントをご参照ください。ディメンションテーブルの結合ヒントには、LOOKUP ヒントとその他の結合ヒントが含まれます。
Ververica Runtime (VVR) 8.0 以降を使用する Realtime Compute for Apache Flink のみ LOOKUP ヒントをサポートします。
VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink のみ、LOOKUP ヒントを使用してシャッフル戦略を設定できます。
VVR 8.0 以降を使用する Realtime Compute for Apache Flink では、ディメンションテーブルの結合ヒントでエイリアスを指定できます。ディメンションテーブルにエイリアスが指定されている場合、結合ヒントではディメンションテーブルのエイリアスを使用する必要があります。
VVR 4.0 以降を使用する Realtime Compute for Apache Flink のみ、その他の結合ヒントをサポートします。
LOOKUP ヒント
Realtime Compute for Apache Flink の LOOKUP ヒント機能は、オープンソースコミュニティによって提供される LOOKUP ヒント機能と一致します。ディメンションテーブルの同期、非同期、再試行ルックアップ戦略を設定できます。詳細については、LOOKUP ヒントをご参照ください。VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink では、LOOKUP ヒント機能が拡張され、'shuffle' = 'true' を設定できるようになりました。このようにして、ディメンションテーブルに対する JOIN 操作のシャッフル戦略を指定できます。次の表は、さまざまなシナリオでのシャッフル戦略を示しています。
シナリオ | JOIN 操作のシャッフル戦略 |
'shuffle' = 'true' が設定されていません。 | エンジンのデフォルトのシャッフル戦略が使用されます。 |
'shuffle' = 'true' が設定されておらず、ディメンションテーブルコネクタが JOIN 操作のカスタムシャッフル戦略を提供していません。 | |
'shuffle' = 'true' が設定されており、ディメンションテーブルコネクタが JOIN 操作のカスタムシャッフル戦略を提供していません。 | デフォルトでは、SHUFFLE_HASH 戦略が使用されます。詳細については、SHUFFLE_HASH をご参照ください。 |
'shuffle' = 'true' が設定されており、ディメンションテーブルコネクタが JOIN 操作のカスタムシャッフル戦略を提供しています。 | ディメンションテーブルコネクタのカスタムシャッフル戦略が使用されます。 |
Paimon コネクタのみがカスタムシャッフル戦略を提供します。ディメンションテーブルの結合カラムにすべてのバケットフィールドが含まれている場合、ディメンションテーブルはバケットに基づいてシャッフルされます。
次のサンプルコードは、ディメンションテーブルで JOIN 操作を実行するときにシャッフル戦略を設定する方法の例を示しています。
-- JOIN 操作を実行するディメンションテーブル dim1 のみにシャッフル戦略を設定します。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- JOIN 操作を実行するディメンションテーブル dim1 と dim2 のシャッフル戦略を設定します。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- ヒントでディメンションテーブル dim1 のエイリアス D1 を使用して、JOIN 操作のシャッフル戦略を設定します。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- ヒントでディメンションテーブル dim1 と dim2 のエイリアスを使用して、JOIN 操作のシャッフル戦略を設定します。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.bその他の結合ヒント
ディメンションテーブルのその他の結合ヒントは、SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH、SKEW 戦略など、ディメンションテーブルの結合戦略を設定するためにのみ使用されます。次の表は、ディメンションテーブルのキャッシュポリシーの設定に基づいた結合戦略のユースケースシナリオを示しています。
キャッシュポリシー | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (SKEW と同等) |
なし | この結合戦略を使用しないことをお勧めします。この結合戦略を使用すると、メインストリームデータに追加のネットワークオーバーヘッドが発生します。 | この結合戦略を使用しないことをお勧めします。この結合戦略を使用すると、メインストリームデータに追加のネットワークオーバーヘッドが発生します。 |
LRU | ディメンションテーブルのルックアップ I/O がボトルネックになる場合は、この結合戦略を使用することをお勧めします。メインストリームデータに結合キーに時間的局所性がある場合、この結合戦略はキャッシュヒット率を高め、I/O リクエストの数を減らすことができます。これにより、総スループットが向上します。 重要 メインストリームデータに追加のネットワークオーバーヘッドが発生します。メインストリームデータが結合キーに偏っていて、パフォーマンスのボトルネックが存在する場合は、REPLICATED_SHUFFLE_HASH 結合戦略を使用することをお勧めします。 | ディメンションテーブルのルックアップ I/O がボトルネックになり、メインストリームデータが結合キーに偏っている場合は、この結合戦略を使用することをお勧めします。メインストリームデータに結合キーに時間的局所性がある場合、この結合戦略はキャッシュヒット率を高め、I/O リクエストの数を減らすことができます。これにより、総スループットが向上します。 |
すべて | ディメンションテーブルのメモリ使用量がボトルネックになる場合は、この結合戦略を使用することをお勧めします。このようにして、メモリ使用量を 1/並列度 の値に減らすことができます。 重要 メインストリームデータに追加のネットワークオーバーヘッドが発生します。メインストリームデータが結合キーに偏っていて、パフォーマンスのボトルネックが存在する場合は、REPLICATED_SHUFFLE_HASH 結合戦略を使用することをお勧めします。 | ディメンションテーブルのメモリ使用量がボトルネックになり、メインストリームデータが結合キーに偏っている場合は、この結合戦略を使用することをお勧めします。このようにして、メモリ使用量を バケット数/並列度 の値に減らすことができます。 |
SHUFFLE_HASH
効果
SHUFFLE_HASH結合戦略を使用すると、JOIN操作を実行する前に、結合キーに基づいてメインストリームデータをシャッフルできます。キャッシュポリシーがLRUの場合、キャッシュヒット率が向上し、I/O要求の数が減少します。キャッシュポリシーがALLの場合、メモリ使用量が削減されます。各SHUFFLE_HASH結合ヒントでは、複数のディメンションテーブルを指定できます。
制限
SHUFFLE_HASH結合戦略を使用すると、メモリのオーバーヘッドが削減されます。ただし、アップストリームデータは結合キーに基づいてシャッフルする必要があるため、追加のネットワークオーバーヘッドが発生します。したがって、SHUFFLE_HASH結合戦略は、次のシナリオには適していません。
メインストリームデータの結合キーに深刻なデータスキューがある場合。SHUFFLE_HASH結合戦略を使用してデータを結合すると、データスキューが原因で結合演算子がパフォーマンスのボトルネックになる可能性があります。これは、ストリーミングデプロイメントでの深刻なバックプレッシャー、またはバッチデプロイメントでの深刻なロングテールを引き起こす可能性があります。このシナリオでは、REPLICATED_SHUFFLE_HASH結合戦略を使用することをお勧めします。
ディメンションテーブルに少量のデータが含まれており、キャッシュポリシーがALLの場合にテーブルの読み込み中にメモリボトルネックが発生しない場合、SHUFFLE_HASH結合戦略を使用することで節約されるメモリのオーバーヘッドは、SHUFFLE_HASH結合戦略を使用することで発生する追加のネットワークオーバーヘッドと比較して、費用効果が高くない可能性があります。
サンプルコード
-- ディメンションテーブル dim1 のみに対して SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- ディメンションテーブル dim1 と dim2 に対して SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- ヒントでディメンションテーブル dim1 のエイリアス D1 を使用して、SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- ヒントでディメンションテーブル dim1 と dim2 のエイリアスを使用して、SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
効果
REPLICATED_SHUFFLE_HASH の効果は、基本的に SHUFFLE_HASH の効果と同じです。ただし、REPLICATED_SHUFFLE_HASH は、同じキーを持つメインストリームデータを指定された数の同時実行スレッドにランダムに分散させ、データの偏りによって発生するパフォーマンスのボトルネックを解消します。各 REPLICATED_SHUFFLE_HASH 結合ヒントで複数のディメンションテーブルを指定できます。
制限事項
table.exec.skew-join.replicate-numパラメーターを設定して、偏ったデータを含むバケットの数を指定する必要があります。このパラメーターのデフォルト値は 16 です。このパラメーターの値は、ディメンションテーブルの結合演算子における同時実行スレッドの数より大きくすることはできません。このパラメーターの設定方法の詳細については、「コンソール操作」をご参照ください。更新ストリームはサポートされていません。メインストリームが更新ストリームで、REPLICATED_SHUFFLE_HASH 結合戦略を使用すると、エラーが返されます。
サンプルコード
-- ディメンションテーブル dim1 に対して REPLICATED_SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- ヒントでディメンションテーブル dim1 のエイリアスを使用して、REPLICATED_SHUFFLE_HASH 結合戦略を有効にします。 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
REPLICATED_SHUFFLE_HASH
効果
指定したテーブルにデータの偏りがある場合、オプティマイザーはディメンションテーブルに対する JOIN 操作に REPLICATED_SHUFFLE_HASH 結合戦略を使用します。SKEW は単なる糖衣構文であり、実際には基盤となるレイヤーで REPLICATED_SHUFFLE_HASH 結合戦略が使用されます。
制限事項
各 SKEW ヒントでは、1 つのテーブルのみを指定できます。
テーブル名は、ディメンションテーブルではなく、データの偏りがあるプライマリテーブルの名前である必要があります。
更新ストリームはサポートされていません。メインストリームが更新ストリームであり、SKEW 結合戦略を使用すると、エラーが返されます。
サンプルコード
SELECT /*+ SKEW(src) */ // src テーブルにデータの偏りがあることを指定します。 FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LOOKUP ヒントのシャッフル戦略は、SHUFFLE_HASH ヒントの機能を提供します。2 つのタイプのヒントを使用する場合、LOOKUP ヒントのシャッフル戦略が SHUFFLE_HASH ヒントよりも優先されます。
LOOKUP ヒントのシャッフル戦略では、データスキューの問題を解決できません。LOOKUP ヒントを REPLICATED_SHUFFLE_HASH ヒントまたは SKEW ヒントと共に使用する場合、REPLICATED_SHUFFLE_HASH ヒントまたは SKEW ヒントのシャッフル戦略が LOOKUP ヒントよりも優先されます。
例
テストデータ
表 1 kafka_input
id (bigint)
name (varchar)
age (bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表 2 phoneNumber
name (varchar)
phoneNumber (bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
テストステートメント
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', // 使用するトピック 'properties.bootstrap.servers' = '<yourKafkaBrokers>', // Kafka ブローカー 'properties.group.id' = '<yourKafkaConsumerGroupId>', // Kafka コンシューマーグループ ID 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', // MySQL ホスト名 'port' = '3306', // MySQL ポート 'username' = '<yourUsername>', // MySQL ユーザー名 'password' = '<yourPassword>', // MySQL パスワード 'database-name' = '<yourDatabaseName>', // MySQL データベース名 'table-name' = '<yourTableName>' // MySQL テーブル名 ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;テスト結果
id (bigint)
phoneNumber (bigint)
name (varchar)
1
1390000444
lilei
3
1390000333
libai