All Products
Search
Document Center

Realtime Compute for Apache Flink:Lookup join

Last Updated:Mar 26, 2026

Dimension table joins let you enrich a data stream with data queried from an external source. Each stream record is matched against the current snapshot of the dimension table at processing time. If dimension table data changes after the join, the result is not updated retroactively.

Limitations

  • A stream can only join the dimension table snapshot taken at the current moment.

  • INNER JOIN and LEFT JOIN are supported. RIGHT JOIN and FULL JOIN are not supported.

  • The ON clause must contain equi-join conditions on fields that support random lookup in the dimension table.

  • For a one-to-one join, the ON clause must include an equi-join condition on a unique field in the dimension table.

  • The ON clause cannot apply type conversion functions such as CAST to dimension table fields. To convert data types, apply the conversion to source table fields instead.

Syntax

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

FOR SYSTEM_TIME AS OF PROCTIME() must be appended to the dimension table reference. It instructs Flink to join each stream record against the version of the dimension table visible at the current processing time.

Cache policies

Most connectors let you configure a cache policy for dimension table joins. Choose a policy based on your data freshness requirements and available memory.

Cache policy Behavior When to use
None (default) No caching. Every lookup reads directly from the external data source. When you need the freshest data on every lookup and I/O latency is acceptable.
LRU A portion of the dimension table is cached in memory. On each lookup, the cache is checked first. On a miss, the system reads from the external source. Set a time-to-live (TTL) to control how long entries stay in cache. When lookup I/O is a bottleneck and the dimension table is too large to fit entirely in memory. Set TTL to a few seconds to tens of seconds to balance freshness and performance.
ALL All dimension table data is loaded into memory before the deployment runs. All subsequent lookups are served from cache. When the cache expires, all data is reloaded. The source table and dimension table cannot be associated based on the ON clause. When the dimension table is small and many missing keys exist. This minimizes external I/O at the cost of higher memory usage.
Important

When the cache policy is ALL:

  • Monitor operator memory to prevent out of memory (OOM) errors.

  • Allocate at least twice the remote table size as operator memory, because Flink asynchronously loads the full table into cache.

Join hints for dimension tables

Join hints let you override the default join strategy for dimension table lookups. Two categories of hints are available: LOOKUP hints and other join hints.

LOOKUP hints require Ververica Runtime (VVR) 8.0 or later. Other join hints (SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, SKEW) require VVR 4.0 or later.

LOOKUP hints

LOOKUP hints configure lookup behavior including synchronous/asynchronous execution, retry strategies, and shuffle strategies. The full LOOKUP hint specification follows the Apache Flink LOOKUP hint interface.

In VVR 8.0 or later, an alias can be specified in a join hint for a dimension table. If an alias is specified for a dimension table, the alias of the dimension table must be used in the join hint.

In VVR 8.0.8 or later, LOOKUP hints are extended with a shuffle option to control how mainstream data is distributed before the join.

LOOKUP hint options:

The default values in the table below are based on the open-source Apache Flink documentation. Actual defaults in VVR may differ. Refer to the VVR release notes or connector documentation for VVR-specific values.
Option type Option name Required Value type Default Description
table table Yes string The table name or alias of the lookup source.
async async No boolean Set to true to enable asynchronous lookup.
async output-mode No string ordered Async output ordering. ordered or allow_unordered.
async capacity No integer 100 Buffer capacity for async lookups.
async timeout No duration 300s Timeout from the first lookup invocation.
retry retry-predicate No string Retry condition. Use lookup_miss to retry when no matching row is found.
retry retry-strategy No string Retry strategy. Use fixed_delay.
retry fixed-delay No duration Delay between retries when using fixed_delay.
retry max-attempts No integer Maximum number of retry attempts.
shuffle shuffle No boolean Set to true to control how mainstream data is distributed before the join. Requires VVR 8.0.8 or later.

Shuffle strategies via LOOKUP hints

Configure 'shuffle' = 'true' in a LOOKUP hint to control how mainstream data is distributed before the join.

Scenario Join policy applied
'shuffle' = 'true' is not set Default engine shuffle strategy.
'shuffle' = 'true' is not set, and the connector has no custom join policy Default engine shuffle strategy.
'shuffle' = 'true' is set, and the connector has no custom join policy SHUFFLE_HASH strategy (see SHUFFLE_HASH).
'shuffle' = 'true' is set, and the connector provides a custom join policy Custom shuffle strategy from the connector.

Only Streaming data lakehouse Paimon provides a custom shuffle strategy. If the join columns include all bucket fields, the dimension table is shuffled based on buckets.

Examples

All examples use FOR SYSTEM_TIME AS OF PROCTIME() to join dimension tables at processing time.

-- Apply shuffle to dim1 only
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Apply shuffle to both dim1 and dim2
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Use alias D1 when the dimension table has an alias
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

-- Use aliases for both dimension tables
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

Other join hints

Other join hints configure the shuffle strategy for dimension table joins. They are used independently of the LOOKUP hint shuffle option. Three strategies are available: SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, and SKEW.

When to use each strategy:

Cache policy SHUFFLE_HASH REPLICATED_SHUFFLE_HASH (equivalent to SKEW)
None Not recommended. Mainstream data incurs additional network overhead with no cache benefit. Not recommended. Same network overhead as SHUFFLE_HASH with no benefit.
LRU Use when lookup I/O is the bottleneck and mainstream data has temporal locality on join keys. Improves cache hit ratio and reduces I/O requests, increasing overall throughput. Avoid if mainstream data is heavily skewed on join keys. Use when lookup I/O is the bottleneck and mainstream data is skewed on join keys. Scatters skewed keys across concurrent threads to resolve the skew-induced performance bottleneck.
ALL Use when the dimension table memory footprint is the bottleneck. Reduces memory usage per operator to 1/parallelism. Avoid if mainstream data is skewed on join keys. Use when memory is the bottleneck and mainstream data is skewed on join keys. Reduces memory usage to (number of buckets)/parallelism.
Important
  • The LOOKUP hint shuffle strategy provides the same capabilities as SHUFFLE_HASH. When both are configured, the LOOKUP hint shuffle strategy takes precedence over SHUFFLE_HASH.

  • REPLICATED_SHUFFLE_HASH and SKEW take precedence over the LOOKUP hint shuffle strategy. These strategies resolve data skew; the LOOKUP hint shuffle strategy does not.

SHUFFLE_HASH

SHUFFLE_HASH shuffles mainstream data by join key before the join. With an LRU cache, this increases cache hit ratio and reduces I/O requests. With an ALL cache, this reduces per-operator memory usage to 1/parallelism.

Limitations:

  • Mainstream data incurs additional network overhead from the shuffle. If the dimension table is small with an ALL cache and has no memory bottleneck, the network overhead may outweigh the memory savings.

  • If mainstream data is heavily skewed on join keys, the join operator may become a bottleneck. Use REPLICATED_SHUFFLE_HASH instead.

You can specify multiple dimension tables in a single SHUFFLE_HASH hint.

-- Apply SHUFFLE_HASH to dim1 only
SELECT /*+ SHUFFLE_HASH(dim1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Apply SHUFFLE_HASH to both dim1 and dim2
SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Use alias D1 when the dimension table has an alias
SELECT /*+ SHUFFLE_HASH(D1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

-- Use aliases for both dimension tables
SELECT /*+ SHUFFLE_HASH(D1, D2) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

REPLICATED_SHUFFLE_HASH

REPLICATED_SHUFFLE_HASH works like SHUFFLE_HASH but also randomly distributes records with the same join key across multiple concurrent threads. This resolves performance bottlenecks caused by data skew.

With an ALL cache, REPLICATED_SHUFFLE_HASH reduces per-operator memory usage to (number of buckets)/parallelism.

Limitations:

  • Configure table.exec.skew-join.replicate-num to set the number of buckets for skewed data. The default is 16. The value cannot exceed the parallelism of the join operator. For configuration instructions, see How do I configure custom running parameters for a job?

  • Update streams are not supported. If the mainstream is an update stream, an error is returned.

You can specify multiple dimension tables in a single REPLICATED_SHUFFLE_HASH hint.

-- Apply REPLICATED_SHUFFLE_HASH to dim1
SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

-- Use alias D1 when the dimension table has an alias
SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

SKEW

SKEW is syntactic sugar for REPLICATED_SHUFFLE_HASH. When you specify SKEW on the primary (mainstream) table, Flink applies the REPLICATED_SHUFFLE_HASH strategy to the dimension table join.

Limitations:

  • Each SKEW hint can reference only one table.

  • Specify the primary table with data skew, not the dimension table.

  • Update streams are not supported.

SELECT /*+ SKEW(src) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

Example

This example joins a Kafka source stream with a MySQL dimension table to look up phone numbers by name.

Input data

Table kafka_input:

id (bigint) name (varchar) age (bigint)
1 lilei 22
2 hanmeimei 20
3 libai 28

Table phoneNumber (MySQL dimension table):

name (varchar) phoneNumber (bigint)
dufu 1390000111
baijuyi 1390000222
libai 1390000333
lilei 1390000444

Statements

CREATE TEMPORARY TABLE kafka_input (
  id   BIGINT,
  name VARCHAR,
  age  BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE phoneNumber(
  name VARCHAR,
  phoneNumber BIGINT,
  PRIMARY KEY(name) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

CREATE TEMPORARY TABLE result_infor(
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM kafka_input as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;

Replace the placeholders with your actual values:

Placeholder Description
<yourTopic> Kafka topic name
<yourKafkaBrokers> Kafka broker addresses
<yourKafkaConsumerGroupId> Kafka consumer group ID
<yourHostname> MySQL hostname
<yourUsername> MySQL username
<yourPassword> MySQL password
<yourDatabaseName> MySQL database name
<yourTableName> MySQL table name

Result

Because this is an INNER JOIN, only rows with matching names in both tables appear in the output.

id (bigint) phoneNumber (bigint) name (varchar)
1 1390000444 lilei
3 1390000333 libai

What's next