Di Realtime Compute for Apache Flink, setiap aliran data dapat dikaitkan dengan tabel dimensi dari sumber data eksternal. Hal ini memungkinkan Anda melakukan kueri terkait di Realtime Compute for Apache Flink. Topik ini menjelaskan cara menggunakan temporal join berdasarkan waktu pemrosesan.
Latar Belakang
Temporal join berdasarkan waktu pemrosesan menggunakan atribut waktu pemrosesan untuk menghubungkan baris dalam tabel fakta dengan versi terbaru dari kunci yang sesuai di tabel dimensi. Tidak seperti temporal join berdasarkan waktu kejadian yang menghubungkan baris berdasarkan waktu terjadinya peristiwa, temporal join berdasarkan waktu pemrosesan menghubungkan baris berdasarkan waktu kedatangan data.
Prasyarat
Realtime Compute for Apache Flink menggunakan Ververica Runtime (VVR) 8.0.10 atau yang lebih baru.
Tabel dimensi MySQL telah dibuat.
Catatan penggunaan
Nonaktifkan pengecekan titik selama sinkronisasi penuh dengan mengonfigurasi
execution.checkpointing.interval-during-backlog = 0, karena Realtime Compute for Apache Flink tidak mendukung pengecekan titik selama sinkronisasi penuh. Konfigurasi ini tidak akan memengaruhi pengecekan titik selama sinkronisasi data inkremental.Untuk menggunakan temporal join berdasarkan waktu pemrosesan, konfigurasikan
table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN.
Sintaks
Sintaks untuk temporal join berdasarkan waktu pemrosesan sama dengan sintaks untuk menggabungkan tabel dimensi:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;Contoh
Data Uji
Tabel 1 kafka_input
id (bigint)
name (varchar)
age (bigint)
1
Lee
22
2
Harry
20
3
Liban
28
Tabel 2 phoneNumber
name (varchar)
phoneNumber (bigint)
David
1390000111
Brooks
1390000222
Liban
1390000333
Lee
1390000444
Kode Uji
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN'; -- Gunakan temporal join berdasarkan waktu pemrosesan. SET 'execution.checkpointing.interval-during-backlog' = '0'; -- Nonaktifkan pengecekan titik selama sinkronisasi penuh. CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); -- Definisikan sebuah tabel dimensi. CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w ON t.name = w.name;Hasil Uji
id (bigint)
phoneNumber (bigint)
name (varchar)
1
1390000444
Lee
3
1390000333
Liban