All Products
Search
Document Center

Realtime Compute for Apache Flink:Get started with JOIN operations of Flink SQL

Last Updated:Oct 28, 2025

Flink SQL supports complex JOIN operations on dynamic tables and provide multiple query semantics and join types. When you perform JOIN operations on Flink SQL, you must avoid the Cartesian product in the query results. Otherwise, the query fails because Flink SQLs do not support Cartesian products. By default, the order of JOIN operations is not optimized. To improve performance, you can adjust the table order in the FROM clause to place the table with the lowest update frequency first and the table with the highest update frequency last.

Joins overview

Join type

Description

Syntax difference

Regular joins

Regular joins are the most commonly used joins. New records or updates to the tables to be joined are visible and affect the result of the overall JOIN operation.

The regular join syntax in streaming queries is very flexible and supports INSERT, UPDATE, and DELETE operations on input tables. However, regular joins permanently store the input data of the tables to be joined as state data, which may lead to unbounded growth in state data. To prevent excessive state data, you can configure a time-to-live (TTL) for the state data. However, this may affect the accuracy of the results.

Interval Joins

Interval joins produce a simple Cartesian product that meets join conditions and time limits.

At least one equivalent join condition and one time-based join condition that is contained in the tables to be joined are required. You can define a time range as a condition. For example, you can use comparison operators, such as less than (<), less than or equal to (<=), greater than or equal to (>=), and greater than (>), to specify a time range. You can also use a BETWEEN condition, or perform an equation check on the time attributes of the same type (processing time or event time) in the tables to be joined.

Temporal joins

Temporal joins are used to join version tables based on the event time or processing time. This way, specific time version data of version tables can be joined at a point in time.

The tables to be joined must contain the same type of processing-time semantics (processing time or event time). Take note of the lifecycle of the join result. The join condition is usually based on a specific timestamp.

Lookup joins

Lookup joins are used to join dimension tables from an external data source with a fact table to enrich data streams. JOIN operations of this type require that one table must contain a processing-time attribute and that the other table must be a dimension table.

The tables to be joined must meet the following requirements: one table must contain a processing-time attribute, and the data of the other table must be obtained by using a lookup source connector. An equivalent join condition is also required to join the two tables.

Lateral joins

Lateral joins are used to join a table with the result table that is generated by using a table-valued function (TVF). Each row of the left table is joined with all the rows of the result table generated by using the TVF.

The ON clause must always contain a fixed TRUE join condition. Example: JOIN LATERAL TABLE(table_func(order_id)) t(res) ON TRUE.

Regular joins

The following four JOIN operations are commonly used:

  • INNER JOIN: returns the data entries that meet the join condition in the two tables.

  • LEFT JOIN: returns all data entries in the left table, including the rows that do not match the rows in the right table.

  • RIGHT JOIN: returns all data entries in the right table, including the rows that do not match the rows in the left table.

  • FULL OUTER JOIN: returns the union of two tables, including all rows in the two tables.

Regular joins diagram

image

Regular joins example

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, choose Development > ETL.

  4. Click the + icon and select New Blank Stream Draft. In the New Draft dialog, enter a name, choose an engine version, and click Create.

    The following sample code shows how to use a JOIN operation to join rows of multiple tables, associating superhero nicknames with their real names.
    CREATE TEMPORARY TABLE NOC (
      agent_id STRING,
      codename STRING
    )
    WITH (
      'connector' = 'faker',   -- Faker connector
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  -- Generate a number from five numbers.
      'fields.codename.expression' = '#{superhero.name}',   -- Fake built-in function, which is used to generate a superhero name.
      'number-of-rows' = '10'   -- Indicates that ten rows of data are generated.  
    );
    
    CREATE TEMPORARY TABLE RealNames (
      agent_id STRING,
      name     STRING
    )
    WITH (
      'connector' = 'faker',
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
      'fields.name.expression' = '#{Name.full_name}',  -- Fake built-in function, which is used to randomly generate a name.
      'number-of-rows' = '10'
    );
    
    SELECT
        name,
        codename
    FROM NOC
    INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;  -- If the value of agent_id is the same for the two tables, the name and codename are generated. The value of agent_id ranges from 1 to 5.

  5. Click Debug in the upper-right corner, select the cluster to be debugged, and then click OK. If no session clusters are available, create a session cluster. For more information, see Create a session cluster.

    image

For more information about how to use regular joins, see Regular join statements.

Interval Joins

Interval joins are used to join two sets of data. Each set of data is divided based on multiple time intervals, which are defined by a start time and an end time. The data in each set is assigned to a related interval based on timestamp. In most cases, interval joins are used to compare two sets of data within a specific time interval.

Interval joins diagram

image

Interval joins example

This example shows how to join tables that contain events with time context. Data where the time difference between the time when an order is placed (order_time) and the shipment time (shipment_time) is within three hours is filtered.

Create a draft on the ETL page by referring to the instructions in the Regular joins section.

CREATE TEMPORARY TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)    -- Generate a random timestamp between 2 and 4 hours before the local time.
)
WITH (
  'connector' = 'datagen', -- Datagen connector, which is used to periodically generate random data.
  "rows-per-second'='10", -- The rate at which random data is generated, which is 10 rows per second.
  'fields.id.kind'='sequence', -- The sequence generator.
  'fields.id.start'='1',       -- The sequence value starts from 1.
  'fields.id.end'='100'        -- The sequence value ends at 100.
);

CREATE TEMPORARY TABLE shipments (
  order_id INT,
  shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP)   -- Generate a random timestamp between 0 and 2 hours before the local time.
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='5',
  'fields.order_id.kind'='sequence',
  'fields.order_id.start'='1',
  'fields.order_id.end'='100'
);

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff    -- The time difference between the time when an order is placed (order_time) and the shipment time (shipment_time).
FROM orders o
JOIN shipments s ON o.id = s.order_id   
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time;  -- Filter the data where the time difference between the time when an order is placed and the shipment time is within three hours.

Click Debug in the upper-right corner and click OK. The following figure shows the debugging results.

image

For more information about how to use interval joins, see Interval join.

Temporal Joins

Temporal joins are commonly used to join temporal tables (also called dynamic tables in Apache Flink). A temporal table is a table that changes over time. Each record is associated with one or more time periods. For example, exchange rates or product prices may fluctuate over time. In this case, temporal joins can be used to map the time when a transaction occurs to the related exchange rate or price at a specific time for accurate calculation.

Temporal joins diagram

image

Temporal joins example

This example shows a business scenario in which orders need to be calculated based on the exchange rate in effect at the time. The exchange rate changes over different time periods.

(Optional) Generate simulated data

  1. Create a draft on the ETL page by referring to the instructions in the Regular joins section.

  1. Use the Faker connector to generate simulated data to write to the Kafka simulated exchange rate dynamic table by using the Upsert Kafka connector.

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE currency_rates_faker (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '2'
);

CREATE TEMPORARY TABLE transactions_faker (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '2'
);

BEGIN STATEMENT SET;

INSERT INTO currency_rates
SELECT * FROM currency_rates_faker;

INSERT INTO transactions
SELECT * FROM transactions_faker;

END;
  1. In the upper-right corner of the SQL editor, click Deploy.

  2. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel, select Initial Mode, and then click Start.

Create a draft on the ETL page by referring to the instructions in the Regular joins section and read the simulated data for debugging.

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'currency_rates',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'transactions',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  c.eur_rate,
  t.total,
  c.currency_code,
  c.rate_time,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code 
;

In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.

The exchange rate changed at 20:16:11 and 20:35:22. A transaction order occurred at 20:35:14 and the exchange rate had not changed at that time. Therefore, the exchange rate at 20:16:11 is used for calculation.

image

Lookup Joins

Lookup joins are commonly used to enrich or supplement data by retrieving data from external systems. Not all data is frequently updated, even data in real-time workflows. In specific cases, you may need to enrich your streaming data with static data stored in external systems. For example, product data may need to be stored in a relational database that is directly connected to Apache Flink. Flink SQL allows you to look up referenced data and combine the data with streaming data by using a lookup join. The JOIN operation requires that one table must contain a time attribute and that the other table must be connected to Apache Flink through a connector, such as the MySQL connector.

Lookup joins diagram

image
Note
  • You must append FOR SYSTEM_TIME AS OF PROCTIME() to the dimension table. This way, each row that can be viewed in the snapshot of the dimension table at the current time is associated with the source data. Therefore, if the data in the dimension table is added, updated, or deleted after the JOIN operation is performed, the associated data remains unchanged.

  • The ON condition must contain equivalent conditions for fields that support random lookup in the dimension table.

Lookup joins example

This example shows how to use an external connecter to enrich order data and supplement product names with the static data stored externally.

Create a draft on the ETL page by referring to the instructions in the Regular joins section.

CREATE TEMPORARY TABLE orders ( 
    order_id STRING,
    product_id INT,
    order_total INT
) WITH (
  'connector' = 'faker',   -- Faker connector
  'fields.order_id.expression' = '#{Internet.uuid}',   -- Generate a random universally unique identifier (UUID).
  'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}',   -- Generate a number from 1 to 5.
  'fields.order_total.expression' = '#{number.numberBetween ''1000','5000'}', -- Generate a number from 1000 to 5000.
  'number-of-rows' = '10'  -- Number of generated data rows.
);

-- Associate the static product data by using the MySQL connector.
CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    o.order_id,
    p.product_name,
    o.order_total,
  CASE 
    WHEN o.order_total > 3000 THEN 1    
    ELSE 0
  END AS is_importance          -- Add the is_importance field. If the value of the order_total field is greater than 3000, the value of this field is 1, which indicates that the order is an important order. 
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p -- The FOR SYSTEM_TIME AS OF PROCTIME() clause ensures that each row in the orders table is associated with rows that match the join conditions in the products table when a JOIN operation is performed on the orders table. 
ON o.product_id = p.product_id;

The following figure shows an example of product data that is associated by using the MySQL connector.

image

In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.

image

For more information about how to use lookup joins, see JOIN statements for dimension tables.

Lateral Joins

Lateral joins allow you to specify a subquery in the FROM clause and execute this subquery for each row of an outer query. This improves the flexibility and performance of SQL queries and optimizes query efficiency by reducing the number of table scans. However, the LATERAL JOIN operation may cause performance degradation if the internal query is complex or the amount of processed data is large.

image

Lateral joins example

This example shows how to aggregate sales order records and filter the top three products by sales records and their sales record quantities.

Create a draft on the ETL page by referring to the instructions in the Regular joins section.

CREATE TEMPORARY TABLE sale (
  sale_id STRING,
  product_id INT,
  sale_num INT
)
WITH (
  'connector' = 'faker',   -- Faker connector
  'fields.sale_id.expression' = '#{Internet.uuid}',   -- Generate a random UUID.
  'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  -- Select a number from five numbers.
  'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}',  -- Generate a random integer from 1 to 10.
  'number-of-rows' = '50'  -- Generate 50 rows of data.
);

CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING,
 PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    p.product_name,
    s.total_sales
FROM  products p
LEFT JOIN LATERAL
    (SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
    ORDER BY total_sales DESC
    LIMIT 3;

In the upper-right corner of the SQL editor, click Debug. In the dialog box that appears, click OK. The following figure shows the debugging results.

image

References