All Products
Search
Document Center

Realtime Compute for Apache Flink:Processing-time temporal join

Last Updated:Mar 26, 2026

Use a processing-time temporal join to enrich a data stream with the latest snapshot of a dimension table. At query time, each incoming row is matched against the current state of the dimension table — that is, the state at the moment the row arrives in Realtime Compute for Apache Flink, not the timestamp encoded in the event itself.

This differs from an event-time temporal join, which correlates rows based on the timestamp embedded in the event.

Prerequisites

Before you begin, ensure that you have:

  • Realtime Compute for Apache Flink using Ververica Runtime (VVR) 8.0.10 or later

  • A MySQL dimension table created and accessible

Usage notes

  • Set execution.checkpointing.interval-during-backlog = 0 to disable checkpointing during full synchronization. Realtime Compute for Apache Flink does not support checkpointing during full synchronization. This setting does not affect checkpointing during incremental data synchronization.

  • Set table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN to enable processing-time temporal joins.

Syntax

The processing-time temporal join uses the same syntax as a standard dimension table join:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

The FOR SYSTEM_TIME AS OF PROCTIME() clause tells Flink to look up the dimension table at the processing time of each incoming row.

Example

This example joins a Kafka fact table with a MySQL dimension table to look up phone numbers by name.

Test data

Table 1: kafka_input

id (bigint) name (varchar) age (bigint)
1 Lee 22
2 Harry 20
3 Liban 28

Table 2: phoneNumber

name (varchar) phoneNumber (bigint)
David 1390000111
Brooks 1390000222
Liban 1390000333
Lee 1390000444

Test code

-- Enable processing-time temporal joins.
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';
-- Disable checkpointing during full synchronization.
SET 'execution.checkpointing.interval-during-backlog' = '0';

-- Define the Kafka fact table.
-- proc_time is a computed column that returns the current processing time.
CREATE TEMPORARY TABLE kafka_input (
  id        BIGINT,
  name      VARCHAR,
  age       BIGINT,
  proc_time AS PROCTIME()
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id'          = '<yourKafkaConsumerGroupId>',
  'format'                       = 'csv'
);

-- Define the MySQL dimension table.
CREATE TEMPORARY TABLE phoneNumber (
  name        VARCHAR,
  phoneNumber BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector'     = 'mysql',
  'hostname'      = '<yourHostname>',
  'port'          = '3306',
  'username'      = '<yourUsername>',
  'password'      = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name'    = '<yourTableName>'
);

-- Define the output table (blackhole discards results; replace with your target sink).
CREATE TEMPORARY TABLE result_infor (
  id          BIGINT,
  phoneNumber BIGINT,
  name        VARCHAR
) WITH (
  'connector' = 'blackhole'
);

-- Join the Kafka stream with the MySQL dimension table at processing time.
INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM kafka_input AS t
JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time AS w
ON t.name = w.name;

Replace the following placeholders with actual values:

Placeholder Description
<yourTopic> Kafka topic name
<yourKafkaBrokers> Kafka bootstrap server addresses
<yourKafkaConsumerGroupId> Kafka consumer group ID
<yourHostname> MySQL server hostname or IP address
<yourUsername> MySQL username
<yourPassword> MySQL password
<yourDatabaseName> MySQL database name
<yourTableName> MySQL table name

Test results

id (bigint) phoneNumber (bigint) name (varchar)
1 1390000444 Lee
3 1390000333 Liban

The row with id = 2 (Harry) has no matching entry in the phoneNumber dimension table, so it is excluded from the results. Use LEFT JOIN instead of JOIN to include unmatched rows with a null phone number.