全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink SQL Join快速入門

更新時間:Sep 02, 2025

Flink SQL支援在動態表上進行複雜的串連操作,提供多種查詢語義和join類型。使用時需避免引發笛卡爾積,因為Flink SQL不支援這種操作,會導致查詢失敗。預設情況下,join順序未最佳化。為提高效能,可在FROM子句中調整表順序,將更新頻率最低的表放在前面,最高的放在後面。

Joins概覽

連線類型

類型說明

文法差異

Regular Joins

Regular Join是最通用的join類型。在這種join下,join兩側表的任何新記錄或變更都是可見的,並會影響整個join的結果

流式查詢中的regular join文法非常靈活,支援對輸入表進行插入、更新和刪除操作。然而,regular join需要將兩邊的輸入資料永久儲存在狀態中,這可能導致狀態無限增長,具體取決於輸入的資料量。為了防止狀態過大,可以設定狀態的time-to-live (TTL),但這可能影響結果的準確性。

Interval Joins

Interval Join是返回一個符合join條件和時間限制的簡單笛卡爾積。

需要至少一個等值串連條件和一個在兩邊均包含的時間限定串連條件。時間範圍的判斷可以定義為一個條件(如 <, <=, >=, >),也可以使用一個BETWEEN 條件,或者對兩邊表中相同類型的時間屬性(即:處理時間或事件時間)進行等式判斷。

Temporal Joins

Temporal Join允許對版本表基於事件時間或處理時間進行join。 這意味著可以基於某個時間點串連版本表的特定時間版本資料。

要求兩邊表具備相同類型的時間處理語義(即:處理時間或事件時間)。且注意串連結果的生命週期,串連條件通常為某個特定的時間戳記。

Lookup Joins

Lookup Join通常用於使用從外部系統查詢的資料來豐富表。Join要求一個表具有處理時間屬性,另一個表提供維表的支援。

要求一個表具備處理時間屬性,而另一個表則需通過尋找源連接器進行支援。兩個表之間還需要一個強制的相等串連條件。

Lateral Joins

Lateral Join將表與資料表值函式的結果串連。左表的每一行都與資料表值函式相應調用產生的所有行相串連。

要求ON 子句中有一個固定的TRUE串連條件。如JOIN LATERAL TABLE(table_func(order_id)) t(res) ON TRUE

Regular Joins

常用的四種連線類型是:

  • INNER JOIN:返回兩個表中滿足串連條件的記錄(交集)。

  • LEFT JOIN:返回左表中的所有記錄,即使右表中沒有匹配的記錄(保留左表)。

  • RIGHT JOIN:返回右表中的所有記錄,即使左表中沒有匹配的記錄(保留右表)。

  • FULL OUTER JOIN:返回兩個表的並集,包含匹配和不匹配的記錄。

Regular Joins圖解

image

Regular Joins樣本

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊資料開發 > ETL

  4. 單擊image後,單擊建立流作業,填寫檔案名稱並選擇引擎版本,單擊建立

    此樣本將展示如何使用串連來關聯多個表之間的行,將超級英雄的暱稱和他們的真實姓名關聯起來。
    CREATE TEMPORARY TABLE NOC (
      agent_id STRING,
      codename STRING
    )
    WITH (
      'connector' = 'faker',   --faker類比資料產生連接器
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  --從五個數字中選擇產生一個數
      'fields.codename.expression' = '#{superhero.name}',   --fake內建函數,將隨機產生一個超級英雄的名字
      'number-of-rows' = '10'   --指定生產的資料為十行  
    );
    
    CREATE TEMPORARY TABLE RealNames (
      agent_id STRING,
      name     STRING
    )
    WITH (
      'connector' = 'faker',
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
      'fields.name.expression' = '#{Name.full_name}',  --fake內建函數,將隨機產生一個名字
      'number-of-rows' = '10'
    );
    
    SELECT
        name,
        codename
    FROM NOC
    INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;  --如果兩張表的agent_id(1-5)相等,則輸出name和codename。
  5. 單擊右上方的調試,選擇調試叢集,單擊確認。如果還沒有Session叢集,詳情請參見建立Session叢集

    image

更多Regular Joins用法詳情請參見雙流JOIN語句

Interval Joins

Interval Joins是將兩組資料進行串連的過程,每組資料被劃分為若干區間,區間由開始時間和結束時間定義。每組中的資料根據其時間戳記被分配到相應的區間。區間串連通常用於比較在一定時間間隔內的兩組資料。

Interval Joins圖解

Interval Joins樣本

此樣本將展示如何在具有時間上下文相關的事件的表之間執行串連。將訂單時間(order_time)與發貨時間(shipment_time)相差三小時內的資料篩選出來。

參見Regular Joins建立ETL作業。

CREATE TEMPORARY TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)    --根據本機時間,隨機擷取-4,-3,-2小時前的時間
)
WITH (
  'connector' = 'datagen',     --datagen連接器,可以周期性產生隨機資料
  'rows-per-second'='10',      --產生隨機資料的速率,10條/s
  'fields.id.kind'='sequence', --序列產生器
  'fields.id.start'='1',       --序列值從1開始
  'fields.id.end'='100'        --序列值從100結束
);

CREATE TEMPORARY TABLE shipments (
  order_id INT,
  shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP)   --根據本機時間,隨機擷取-2,-1,0小時前的時間
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='5',
  'fields.order_id.kind'='sequence',
  'fields.order_id.start'='1',
  'fields.order_id.end'='100'
);

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff    --訂單時間(order_time)與發貨時間(shipment_time)相差的時間
FROM orders o
JOIN shipments s ON o.id = s.order_id   
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time;  --根據發貨時間篩選,訂單是不是在三小時內下單的

單擊右上方的調試,單擊確認。調試結果如下:

image

更多Interval joins的用法詳情請參見IntervalJoin語句

Temporal Joins

時間串連常用於串連時態表(在Flink中也稱為動態表)。時態表是隨時間變化的表,每條記錄都關聯了一個或多個時間段。如匯率或商品的價格在不同時間會有波動,此時需要採用時間串連,將事務發生的時間對應到當時相應的匯率或價格進行計算。

Temporal Joins圖解

Temporal Joins樣本

此樣本將展示匯率在不同時間段發生變化後,訂單也需要採用當時生效的匯率進行計算的業務情境。

(可選)產生類比資料

  1. 參見Regular Joins建立ETL作業。

  1. 使用Faker連接器產生類比資料,以Upsert Kafka連接器寫入Kafka類比匯率動態表。

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE currency_rates_faker (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '2'
);

CREATE TEMPORARY TABLE transactions_faker (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '2'
);

BEGIN STATEMENT SET;

INSERT INTO currency_rates
SELECT * FROM currency_rates_faker;

INSERT INTO transactions
SELECT * FROM transactions_faker;

END;
  1. 單擊右上方的部署,進行作業部署。

  2. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

參見Regular Joins建立ETL作業,讀模數擬資料進行調試。

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'currency_rates',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'transactions',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  c.eur_rate,
  t.total,
  c.currency_code,
  c.rate_time,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code 
;

單擊右上方的調試,單擊確認。調試結果如下:

如圖所示,匯率發生了兩次變更,時間分別在20:16:1120:35:22。其中有一個事務訂單發生的時間在20:35:14,但此時匯率還沒有發生變化,所以需要採用20:16:11變更的匯率進行計算。

image

Lookup Joins

尋找串連常用於利用從外部系統查詢的資料實現資料擴充或補充。並非所有資料都會頻繁更新,即時工作流程中亦是如此。在某些情況下,您可能需要使用儲存在外部的待用資料來豐富流資料。例如,商品資料可能儲存在需要與Flink直接連接的關聯式資料庫中。Flink SQL允許您尋找引用資料,並使用尋找串連將其與流資料結合。串連要求一個表具有時間屬性,而另一個表則通過連接器進行串連,例如MySQL連接器。

Lookup Joins圖解

說明
  • 必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當前時刻資料快照所看到的每條資料。如果JOIN行為發生後,維表中的資料發生了變化(新增、更新或刪除),則已關聯的維表資料不會被同步變化。

  • ON條件中必須包含維表實際能支援隨機尋找的欄位的等值條件。

Lookup Joins樣本

此樣本將展示串連外部連接器的待用資料豐富訂單資料,補充商品名稱。

參見Regular Joins建立ETL作業。

CREATE TEMPORARY TABLE orders ( 
    order_id STRING,
    product_id INT,
    order_total INT
) WITH (
  'connector' = 'faker',   --faker類比資料產生連接器
  'fields.order_id.expression' = '#{Internet.uuid}',   --隨機產生UUID
  'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}',   --從數字1-5中產生一個數
  'fields.order_total.expression' = '#{number.numberBetween ''1000'',''5000''}',  --從數字1000-5000中產生一個數
  'number-of-rows' = '10'  --產生的資料數量
);

--串連MySQL的商品待用資料
CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    o.order_id,
    p.product_name,
    o.order_total,
  CASE 
    WHEN o.order_total > 3000 THEN 1    
    ELSE 0
  END AS is_importance          --增加欄位is_importance,如果銷售額大於3000則值為1,表示是否為重要訂單。
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p --FOR SYSTEM_TIME AS OF PROCTIME()子句確保在聯結運算子處理orders行時,orders的每一行都與join條件匹配的products行串連。
ON o.product_id = p.product_id;

MySQL商品資料表示例

image

單擊右上方的調試,單擊確認。調試結果如下:

image

更多Lookup Joins的用法詳情請參見維表JOIN語句

Lateral Joins

Lateral Joins允許在FROM子句中指定子查詢,針對外部查詢的每一行執行此子查詢,從而提高 SQL 查詢的靈活性和效能,通常能夠通過減少表掃描的次數來最佳化查詢效率。然而,當內部查詢複雜或處理的資料量較大時,此操作可能會導致效能下降。

Lateral Joins樣本

此樣本將合計銷售訂單記錄,將篩選出銷售記錄前三的商品和銷售條數。

參見Regular Joins建立ETL作業。

CREATE TEMPORARY TABLE sale (
  sale_id STRING,
  product_id INT,
  sale_num INT
)
WITH (
  'connector' = 'faker',    --faker類比資料產生連接器
  'fields.sale_id.expression' = '#{Internet.uuid}',   --隨機產生UUID
  'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',   --從五個數字中選擇產生一個數
  'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}',  --從數字1-10中隨機產生一個整數
  'number-of-rows' = '50'   --產生50條資料
);

CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING,
 PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    p.product_name,
    s.total_sales
FROM  products p
LEFT JOIN LATERAL
    (SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
    ORDER BY total_sales DESC
    LIMIT 3;

單擊右上方的調試,單擊確認。調試結果如下:

image

相關文檔