Flink SQL supports complex JOIN operations on dynamic tables and provide multiple query semantics and join types. When you perform JOIN operations on Flink SQL, you must avoid the Cartesian product in the query results. Otherwise, the query fails because Flink SQLs do not support Cartesian products. By default, the order of JOIN operations is not optimized. To improve performance, you can adjust the table order in the FROM clause to place the table with the lowest update frequency first and the table with the highest update frequency last.
Joins overview
Join type | Description | Syntax difference |
Regular joins are the most commonly used joins. New records or updates to the tables to be joined are visible and affect the result of the overall JOIN operation. | The regular join syntax in streaming queries is very flexible and supports INSERT, UPDATE, and DELETE operations on input tables. However, regular joins permanently store the input data of the tables to be joined as state data, which may lead to unbounded growth in state data. To prevent excessive state data, you can configure a time-to-live (TTL) for the state data. However, this may affect the accuracy of the results. | |
Interval joins produce a simple Cartesian product that meets join conditions and time limits. | At least one equivalent join condition and one time-based join condition that is contained in the tables to be joined are required. You can define a time range as a condition. For example, you can use comparison operators, such as less than (<), less than or equal to (<=), greater than or equal to (>=), and greater than (>), to specify a time range. You can also use a | |
Temporal joins are used to join version tables based on the event time or processing time. This way, specific time version data of version tables can be joined at a point in time. | The tables to be joined must contain the same type of processing-time semantics (processing time or event time). Take note of the lifecycle of the join result. The join condition is usually based on a specific timestamp. | |
Lookup joins are used to join dimension tables from an external data source with a fact table to enrich data streams. JOIN operations of this type require that one table must contain a processing-time attribute and that the other table must be a dimension table. | The tables to be joined must meet the following requirements: one table must contain a processing-time attribute, and the data of the other table must be obtained by using a lookup source connector. An equivalent join condition is also required to join the two tables. | |
Lateral joins are used to join a table with the result table that is generated by using a table-valued function (TVF). Each row of the left table is joined with all the rows of the result table generated by using the TVF. | The |
Regular joins
The following four JOIN operations are commonly used:
INNER JOIN: returns the data entries that meet the join condition in the two tables.
LEFT JOIN: returns all data entries in the left table, including the rows that do not match the rows in the right table.
RIGHT JOIN: returns all data entries in the right table, including the rows that do not match the rows in the left table.
FULL OUTER JOIN: returns the union of two tables, including all rows in the two tables.
Regular joins diagram

Regular joins example
Log on to the Realtime Compute for Apache Flink console.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, choose .
Click the + icon and select New Blank Stream Draft. In the New Draft dialog, enter a name, choose an engine version, and click Create.
The following sample code shows how to use a JOIN operation to join rows of multiple tables, associating superhero nicknames with their real names.
CREATE TEMPORARY TABLE NOC ( agent_id STRING, codename STRING ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- Generate a number from five numbers. 'fields.codename.expression' = '#{superhero.name}', -- Fake built-in function, which is used to generate a superhero name. 'number-of-rows' = '10' -- Indicates that ten rows of data are generated. ); CREATE TEMPORARY TABLE RealNames ( agent_id STRING, name STRING ) WITH ( 'connector' = 'faker', 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', 'fields.name.expression' = '#{Name.full_name}', -- Fake built-in function, which is used to randomly generate a name. 'number-of-rows' = '10' ); SELECT name, codename FROM NOC INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id; -- If the value of agent_id is the same for the two tables, the name and codename are generated. The value of agent_id ranges from 1 to 5.Click Debug in the upper-right corner, select the cluster to be debugged, and then click OK. If no session clusters are available, create a session cluster. For more information, see Create a session cluster.

For more information about how to use regular joins, see Regular join statements.
Interval Joins
Interval joins are used to join two sets of data. Each set of data is divided based on multiple time intervals, which are defined by a start time and an end time. The data in each set is assigned to a related interval based on timestamp. In most cases, interval joins are used to compare two sets of data within a specific time interval.
Interval joins diagram
Interval joins example
This example shows how to join tables that contain events with time context. Data where the time difference between the time when an order is placed (order_time) and the shipment time (shipment_time) is within three hours is filtered.
Create a draft on the ETL page by referring to the instructions in the Regular joins section.
CREATE TEMPORARY TABLE orders (
id INT,
order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP) -- Generate a random timestamp between 2 and 4 hours before the local time.
)
WITH (
'connector' = 'datagen', -- Datagen connector, which is used to periodically generate random data.
"rows-per-second'='10", -- The rate at which random data is generated, which is 10 rows per second.
'fields.id.kind'='sequence', -- The sequence generator.
'fields.id.start'='1', -- The sequence value starts from 1.
'fields.id.end'='100' -- The sequence value ends at 100.
);
CREATE TEMPORARY TABLE shipments (
order_id INT,
shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP) -- Generate a random timestamp between 0 and 2 hours before the local time.
)
WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='100'
);
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff -- The time difference between the time when an order is placed (order_time) and the shipment time (shipment_time).
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time; -- Filter the data where the time difference between the time when an order is placed and the shipment time is within three hours.Click Debug in the upper-right corner and click OK. The following figure shows the debugging results.

For more information about how to use interval joins, see Interval join.
Temporal Joins
Temporal joins are commonly used to join temporal tables (also called dynamic tables in Apache Flink). A temporal table is a table that changes over time. Each record is associated with one or more time periods. For example, exchange rates or product prices may fluctuate over time. In this case, temporal joins can be used to map the time when a transaction occurs to the related exchange rate or price at a specific time for accurate calculation.
Temporal joins diagram
Temporal joins example
This example shows a business scenario in which orders need to be calculated based on the exchange rate in effect at the time. The exchange rate changes over different time periods.
Create a draft on the ETL page by referring to the instructions in the Regular joins section and read the simulated data for debugging.
CREATE TEMPORARY TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'currency_rates',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'transactions',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
c.eur_rate,
t.total,
c.currency_code,
c.rate_time,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code
;In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.
The exchange rate changed at 20:16:11 and 20:35:22. A transaction order occurred at 20:35:14 and the exchange rate had not changed at that time. Therefore, the exchange rate at 20:16:11 is used for calculation.

Lookup Joins
Lookup joins are commonly used to enrich or supplement data by retrieving data from external systems. Not all data is frequently updated, even data in real-time workflows. In specific cases, you may need to enrich your streaming data with static data stored in external systems. For example, product data may need to be stored in a relational database that is directly connected to Apache Flink. Flink SQL allows you to look up referenced data and combine the data with streaming data by using a lookup join. The JOIN operation requires that one table must contain a time attribute and that the other table must be connected to Apache Flink through a connector, such as the MySQL connector.
Lookup joins diagram
You must append FOR SYSTEM_TIME AS OF PROCTIME() to the dimension table. This way, each row that can be viewed in the snapshot of the dimension table at the current time is associated with the source data. Therefore, if the data in the dimension table is added, updated, or deleted after the JOIN operation is performed, the associated data remains unchanged.
The ON condition must contain equivalent conditions for fields that support random lookup in the dimension table.
Lookup joins example
This example shows how to use an external connecter to enrich order data and supplement product names with the static data stored externally.
Create a draft on the ETL page by referring to the instructions in the Regular joins section.
CREATE TEMPORARY TABLE orders (
order_id STRING,
product_id INT,
order_total INT
) WITH (
'connector' = 'faker', -- Faker connector
'fields.order_id.expression' = '#{Internet.uuid}', -- Generate a random universally unique identifier (UUID).
'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}', -- Generate a number from 1 to 5.
'fields.order_total.expression' = '#{number.numberBetween ''1000','5000'}', -- Generate a number from 1000 to 5000.
'number-of-rows' = '10' -- Number of generated data rows.
);
-- Associate the static product data by using the MySQL connector.
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
o.order_id,
p.product_name,
o.order_total,
CASE
WHEN o.order_total > 3000 THEN 1
ELSE 0
END AS is_importance -- Add the is_importance field. If the value of the order_total field is greater than 3000, the value of this field is 1, which indicates that the order is an important order.
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p -- The FOR SYSTEM_TIME AS OF PROCTIME() clause ensures that each row in the orders table is associated with rows that match the join conditions in the products table when a JOIN operation is performed on the orders table.
ON o.product_id = p.product_id;The following figure shows an example of product data that is associated by using the MySQL connector.

In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.

For more information about how to use lookup joins, see JOIN statements for dimension tables.
Lateral Joins
Lateral joins allow you to specify a subquery in the FROM clause and execute this subquery for each row of an outer query. This improves the flexibility and performance of SQL queries and optimizes query efficiency by reducing the number of table scans. However, the LATERAL JOIN operation may cause performance degradation if the internal query is complex or the amount of processed data is large.
Lateral joins example
This example shows how to aggregate sales order records and filter the top three products by sales records and their sales record quantities.
Create a draft on the ETL page by referring to the instructions in the Regular joins section.
CREATE TEMPORARY TABLE sale (
sale_id STRING,
product_id INT,
sale_num INT
)
WITH (
'connector' = 'faker', -- Faker connector
'fields.sale_id.expression' = '#{Internet.uuid}', -- Generate a random UUID.
'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- Select a number from five numbers.
'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}', -- Generate a random integer from 1 to 10.
'number-of-rows' = '50' -- Generate 50 rows of data.
);
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING,
PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
p.product_name,
s.total_sales
FROM products p
LEFT JOIN LATERAL
(SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
ORDER BY total_sales DESC
LIMIT 3;In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.

References
If you are only interested in the processing time of an event rather than the occurrence time of the event, you can use processing-time temporal join statements. For more information, see Processing-time temporal join statements.
For more information about how to use regular joins, see Regular join statements.
For more information about how to use interval joins, see Interval join.
For more information about how to use lookup joins, see JOIN statements for dimension tables.