Use a processing-time temporal join to enrich a data stream with the latest snapshot of a dimension table. At query time, each incoming row is matched against the current state of the dimension table — that is, the state at the moment the row arrives in Realtime Compute for Apache Flink, not the timestamp encoded in the event itself.
This differs from an event-time temporal join, which correlates rows based on the timestamp embedded in the event.
Prerequisites
Before you begin, ensure that you have:
-
Realtime Compute for Apache Flink using Ververica Runtime (VVR) 8.0.10 or later
-
A MySQL dimension table created and accessible
Usage notes
-
Set
execution.checkpointing.interval-during-backlog = 0to disable checkpointing during full synchronization. Realtime Compute for Apache Flink does not support checkpointing during full synchronization. This setting does not affect checkpointing during incremental data synchronization. -
Set
table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOINto enable processing-time temporal joins.
Syntax
The processing-time temporal join uses the same syntax as a standard dimension table join:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
The FOR SYSTEM_TIME AS OF PROCTIME() clause tells Flink to look up the dimension table at the processing time of each incoming row.
Example
This example joins a Kafka fact table with a MySQL dimension table to look up phone numbers by name.
Test data
Table 1: kafka_input
| id (bigint) | name (varchar) | age (bigint) |
|---|---|---|
| 1 | Lee | 22 |
| 2 | Harry | 20 |
| 3 | Liban | 28 |
Table 2: phoneNumber
| name (varchar) | phoneNumber (bigint) |
|---|---|
| David | 1390000111 |
| Brooks | 1390000222 |
| Liban | 1390000333 |
| Lee | 1390000444 |
Test code
-- Enable processing-time temporal joins.
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';
-- Disable checkpointing during full synchronization.
SET 'execution.checkpointing.interval-during-backlog' = '0';
-- Define the Kafka fact table.
-- proc_time is a computed column that returns the current processing time.
CREATE TEMPORARY TABLE kafka_input (
id BIGINT,
name VARCHAR,
age BIGINT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
-- Define the MySQL dimension table.
CREATE TEMPORARY TABLE phoneNumber (
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
-- Define the output table (blackhole discards results; replace with your target sink).
CREATE TEMPORARY TABLE result_infor (
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- Join the Kafka stream with the MySQL dimension table at processing time.
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM kafka_input AS t
JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time AS w
ON t.name = w.name;
Replace the following placeholders with actual values:
| Placeholder | Description |
|---|---|
<yourTopic> |
Kafka topic name |
<yourKafkaBrokers> |
Kafka bootstrap server addresses |
<yourKafkaConsumerGroupId> |
Kafka consumer group ID |
<yourHostname> |
MySQL server hostname or IP address |
<yourUsername> |
MySQL username |
<yourPassword> |
MySQL password |
<yourDatabaseName> |
MySQL database name |
<yourTableName> |
MySQL table name |
Test results
| id (bigint) | phoneNumber (bigint) | name (varchar) |
|---|---|---|
| 1 | 1390000444 | Lee |
| 3 | 1390000333 | Liban |
The row with id = 2 (Harry) has no matching entry in the phoneNumber dimension table, so it is excluded from the results. Use LEFT JOIN instead of JOIN to include unmatched rows with a null phone number.