Dimension table joins let you enrich a data stream with data queried from an external source. Each stream record is matched against the current snapshot of the dimension table at processing time. If dimension table data changes after the join, the result is not updated retroactively.
Limitations
-
A stream can only join the dimension table snapshot taken at the current moment.
-
INNER JOIN and LEFT JOIN are supported. RIGHT JOIN and FULL JOIN are not supported.
-
The
ONclause must contain equi-join conditions on fields that support random lookup in the dimension table. -
For a one-to-one join, the
ONclause must include an equi-join condition on a unique field in the dimension table. -
The
ONclause cannot apply type conversion functions such asCASTto dimension table fields. To convert data types, apply the conversion to source table fields instead.
Syntax
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
FOR SYSTEM_TIME AS OF PROCTIME() must be appended to the dimension table reference. It instructs Flink to join each stream record against the version of the dimension table visible at the current processing time.
Cache policies
Most connectors let you configure a cache policy for dimension table joins. Choose a policy based on your data freshness requirements and available memory.
| Cache policy | Behavior | When to use |
|---|---|---|
| None (default) | No caching. Every lookup reads directly from the external data source. | When you need the freshest data on every lookup and I/O latency is acceptable. |
| LRU | A portion of the dimension table is cached in memory. On each lookup, the cache is checked first. On a miss, the system reads from the external source. Set a time-to-live (TTL) to control how long entries stay in cache. | When lookup I/O is a bottleneck and the dimension table is too large to fit entirely in memory. Set TTL to a few seconds to tens of seconds to balance freshness and performance. |
| ALL | All dimension table data is loaded into memory before the deployment runs. All subsequent lookups are served from cache. When the cache expires, all data is reloaded. The source table and dimension table cannot be associated based on the ON clause. |
When the dimension table is small and many missing keys exist. This minimizes external I/O at the cost of higher memory usage. |
When the cache policy is ALL:
-
Monitor operator memory to prevent out of memory (OOM) errors.
-
Allocate at least twice the remote table size as operator memory, because Flink asynchronously loads the full table into cache.
Join hints for dimension tables
Join hints let you override the default join strategy for dimension table lookups. Two categories of hints are available: LOOKUP hints and other join hints.
LOOKUP hints require Ververica Runtime (VVR) 8.0 or later. Other join hints (SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, SKEW) require VVR 4.0 or later.
LOOKUP hints
LOOKUP hints configure lookup behavior including synchronous/asynchronous execution, retry strategies, and shuffle strategies. The full LOOKUP hint specification follows the Apache Flink LOOKUP hint interface.
In VVR 8.0 or later, an alias can be specified in a join hint for a dimension table. If an alias is specified for a dimension table, the alias of the dimension table must be used in the join hint.
In VVR 8.0.8 or later, LOOKUP hints are extended with a shuffle option to control how mainstream data is distributed before the join.
LOOKUP hint options:
The default values in the table below are based on the open-source Apache Flink documentation. Actual defaults in VVR may differ. Refer to the VVR release notes or connector documentation for VVR-specific values.
| Option type | Option name | Required | Value type | Default | Description |
|---|---|---|---|---|---|
table |
table |
Yes | string | — | The table name or alias of the lookup source. |
async |
async |
No | boolean | — | Set to true to enable asynchronous lookup. |
async |
output-mode |
No | string | ordered |
Async output ordering. ordered or allow_unordered. |
async |
capacity |
No | integer | 100 |
Buffer capacity for async lookups. |
async |
timeout |
No | duration | 300s |
Timeout from the first lookup invocation. |
retry |
retry-predicate |
No | string | — | Retry condition. Use lookup_miss to retry when no matching row is found. |
retry |
retry-strategy |
No | string | — | Retry strategy. Use fixed_delay. |
retry |
fixed-delay |
No | duration | — | Delay between retries when using fixed_delay. |
retry |
max-attempts |
No | integer | — | Maximum number of retry attempts. |
shuffle |
shuffle |
No | boolean | — | Set to true to control how mainstream data is distributed before the join. Requires VVR 8.0.8 or later. |
Shuffle strategies via LOOKUP hints
Configure 'shuffle' = 'true' in a LOOKUP hint to control how mainstream data is distributed before the join.
| Scenario | Join policy applied |
|---|---|
'shuffle' = 'true' is not set |
Default engine shuffle strategy. |
'shuffle' = 'true' is not set, and the connector has no custom join policy |
Default engine shuffle strategy. |
'shuffle' = 'true' is set, and the connector has no custom join policy |
SHUFFLE_HASH strategy (see SHUFFLE_HASH). |
'shuffle' = 'true' is set, and the connector provides a custom join policy |
Custom shuffle strategy from the connector. |
Only Streaming data lakehouse Paimon provides a custom shuffle strategy. If the join columns include all bucket fields, the dimension table is shuffled based on buckets.
Examples
All examples use FOR SYSTEM_TIME AS OF PROCTIME() to join dimension tables at processing time.
-- Apply shuffle to dim1 only
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Apply shuffle to both dim1 and dim2
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Use alias D1 when the dimension table has an alias
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- Use aliases for both dimension tables
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
Other join hints
Other join hints configure the shuffle strategy for dimension table joins. They are used independently of the LOOKUP hint shuffle option. Three strategies are available: SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, and SKEW.
When to use each strategy:
| Cache policy | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (equivalent to SKEW) |
|---|---|---|
| None | Not recommended. Mainstream data incurs additional network overhead with no cache benefit. | Not recommended. Same network overhead as SHUFFLE_HASH with no benefit. |
| LRU | Use when lookup I/O is the bottleneck and mainstream data has temporal locality on join keys. Improves cache hit ratio and reduces I/O requests, increasing overall throughput. Avoid if mainstream data is heavily skewed on join keys. | Use when lookup I/O is the bottleneck and mainstream data is skewed on join keys. Scatters skewed keys across concurrent threads to resolve the skew-induced performance bottleneck. |
| ALL | Use when the dimension table memory footprint is the bottleneck. Reduces memory usage per operator to 1/parallelism. Avoid if mainstream data is skewed on join keys. | Use when memory is the bottleneck and mainstream data is skewed on join keys. Reduces memory usage to (number of buckets)/parallelism. |
-
The LOOKUP hint shuffle strategy provides the same capabilities as SHUFFLE_HASH. When both are configured, the LOOKUP hint shuffle strategy takes precedence over SHUFFLE_HASH.
-
REPLICATED_SHUFFLE_HASH and SKEW take precedence over the LOOKUP hint shuffle strategy. These strategies resolve data skew; the LOOKUP hint shuffle strategy does not.
SHUFFLE_HASH
SHUFFLE_HASH shuffles mainstream data by join key before the join. With an LRU cache, this increases cache hit ratio and reduces I/O requests. With an ALL cache, this reduces per-operator memory usage to 1/parallelism.
Limitations:
-
Mainstream data incurs additional network overhead from the shuffle. If the dimension table is small with an ALL cache and has no memory bottleneck, the network overhead may outweigh the memory savings.
-
If mainstream data is heavily skewed on join keys, the join operator may become a bottleneck. Use REPLICATED_SHUFFLE_HASH instead.
You can specify multiple dimension tables in a single SHUFFLE_HASH hint.
-- Apply SHUFFLE_HASH to dim1 only
SELECT /*+ SHUFFLE_HASH(dim1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Apply SHUFFLE_HASH to both dim1 and dim2
SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Use alias D1 when the dimension table has an alias
SELECT /*+ SHUFFLE_HASH(D1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- Use aliases for both dimension tables
SELECT /*+ SHUFFLE_HASH(D1, D2) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
REPLICATED_SHUFFLE_HASH works like SHUFFLE_HASH but also randomly distributes records with the same join key across multiple concurrent threads. This resolves performance bottlenecks caused by data skew.
With an ALL cache, REPLICATED_SHUFFLE_HASH reduces per-operator memory usage to (number of buckets)/parallelism.
Limitations:
-
Configure
table.exec.skew-join.replicate-numto set the number of buckets for skewed data. The default is 16. The value cannot exceed the parallelism of the join operator. For configuration instructions, see How do I configure custom running parameters for a job? -
Update streams are not supported. If the mainstream is an update stream, an error is returned.
You can specify multiple dimension tables in a single REPLICATED_SHUFFLE_HASH hint.
-- Apply REPLICATED_SHUFFLE_HASH to dim1
SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
-- Use alias D1 when the dimension table has an alias
SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
SKEW is syntactic sugar for REPLICATED_SHUFFLE_HASH. When you specify SKEW on the primary (mainstream) table, Flink applies the REPLICATED_SHUFFLE_HASH strategy to the dimension table join.
Limitations:
-
Each SKEW hint can reference only one table.
-
Specify the primary table with data skew, not the dimension table.
-
Update streams are not supported.
SELECT /*+ SKEW(src) */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
Example
This example joins a Kafka source stream with a MySQL dimension table to look up phone numbers by name.
Input data
Table kafka_input:
| id (bigint) | name (varchar) | age (bigint) |
|---|---|---|
| 1 | lilei | 22 |
| 2 | hanmeimei | 20 |
| 3 | libai | 28 |
Table phoneNumber (MySQL dimension table):
| name (varchar) | phoneNumber (bigint) |
|---|---|
| dufu | 1390000111 |
| baijuyi | 1390000222 |
| libai | 1390000333 |
| lilei | 1390000444 |
Statements
CREATE TEMPORARY TABLE kafka_input (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY(name) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
CREATE TEMPORARY TABLE result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM kafka_input as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
Replace the placeholders with your actual values:
| Placeholder | Description |
|---|---|
<yourTopic> |
Kafka topic name |
<yourKafkaBrokers> |
Kafka broker addresses |
<yourKafkaConsumerGroupId> |
Kafka consumer group ID |
<yourHostname> |
MySQL hostname |
<yourUsername> |
MySQL username |
<yourPassword> |
MySQL password |
<yourDatabaseName> |
MySQL database name |
<yourTableName> |
MySQL table name |
Result
Because this is an INNER JOIN, only rows with matching names in both tables appear in the output.
| id (bigint) | phoneNumber (bigint) | name (varchar) |
|---|---|---|
| 1 | 1390000444 | lilei |
| 3 | 1390000333 | libai |
What's next
-
Supported connectors — connector-specific behavior for dimension table joins, including supported cache policies
-
Flink SQL hints — full reference for SQL hint syntax
-
LOOKUP hints — async, sync, and retry configuration options