Realtime Compute for Apache Flink では、各データストリームを外部データソースのディメンションテーブルに関連付けることができます。これにより、Realtime Compute for Apache Flink で関連クエリを実行できます。このトピックでは、処理時間テンポラル結合の使用方法について説明します。
背景
処理時間テンポラル結合は、処理時間属性を使用して、ファクトテーブルの行をディメンションテーブルの対応するキーの最新バージョンに関連付けます。イベント発生時の時刻に基づいて行を関連付けるイベント時間テンポラル結合とは異なり、処理時間テンポラル結合はデータ到着時刻に基づいて行を関連付けます。
前提条件
Realtime Compute for Apache Flink は、Ververica Runtime (VVR) 8.0.10 以降を使用します。
MySQL ディメンションテーブルが作成されています。
使用上の注意
Realtime Compute for Apache Flink はフル同期中のチェックポイントをサポートしていないため、
execution.checkpointing.interval-during-backlog = 0を設定して、フル同期中のチェックポイントを無効にします。この設定は、増分データ同期中のチェックポイントには影響しません。処理時間テンポラル結合を使用するには、
table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOINを設定します。
構文
処理時間テンポラル結合の構文は、ディメンションテーブルの結合と同じです。
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;例
テストデータ
テーブル 1 kafka_input
id (bigint)
name (varchar)
age (bigint)
1
Lee
22
2
Harry
20
3
Liban
28
テーブル 2 phoneNumber
name (varchar)
phoneNumber (bigint)
David
1390000111
Brooks
1390000222
Liban
1390000333
Lee
1390000444
テストコード
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN'; -- 処理時間テンポラル結合を使用します。 SET 'execution.checkpointing.interval-during-backlog' = '0'; -- フル同期中のチェックポイントを無効にします。 CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); -- ディメンションテーブルを定義します。 CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w ON t.name = w.name;テスト結果
id (bigint)
phoneNumber (bigint)
name (varchar)
1
1390000444
Lee
3
1390000333
Liban