Flink SQL は、動的テーブルに対する複雑な JOIN 操作をサポートし、複数のクエリセマンティクスと結合タイプを提供します。Flink SQL で JOIN 操作を実行する場合、クエリ結果にデカルト積が含まれないようにする必要があります。含まれる場合、Flink SQL はデカルト積をサポートしていないため、クエリは失敗します。デフォルトでは、JOIN 操作の順序は最適化されていません。パフォーマンスを向上させるには、FROM 句のテーブル順序を調整して、更新頻度が最も低いテーブルを最初に配置し、更新頻度が最も高いテーブルを最後に配置します。
結合の概要
結合タイプ | 説明 | 構文の違い |
標準結合は、最も一般的に使用される結合です。結合されるテーブルへの新しいレコードまたは更新は表示され、JOIN 操作全体の結果に影響します。 | ストリーミングクエリにおける標準結合の構文は非常に柔軟で、入力テーブルに対する INSERT、UPDATE、および DELETE 操作をサポートしています。ただし、標準結合は、結合されるテーブルの入力データを状態データとして永続的に保存するため、状態データが無制限に増加する可能性があります。状態データの過剰な増加を防ぐために、状態データの Time-To-Live(TTL)を設定できます。ただし、これにより結果の精度が影響を受ける可能性があります。 | |
インターバル結合は、結合条件と時間制限を満たす単純なデカルト積を生成します。 | 結合されるテーブルに含まれる、少なくとも 1 つの等価結合条件と 1 つの時間ベースの結合条件が必要です。 時間範囲を条件として定義できます。 たとえば、小なり (<)、小なりイコール (<=)、大なりイコール (>=)、大なり (>) などの比較演算子を使用して時間範囲を指定できます。 また、 | |
テンポラル結合は、イベント時間または処理時間に基づいてバージョン管理テーブルを結合するために使用されます。これにより、バージョン管理テーブルの特定の時間のバージョンデータをある時点で結合できます。 | 結合されるテーブルには、同じタイプの処理時間セマンティクス(処理時間またはイベント時間)が含まれている必要があります。結合結果のライフサイクルに注意してください。結合条件は通常、特定のタイムスタンプに基づいています。 | |
ルックアップ結合は、外部データソースのディメンションテーブルをファクトテーブルに結合して、データストリームを強化するために使用されます。このタイプの JOIN 操作では、一方のテーブルに処理時間属性が含まれており、もう一方のテーブルがディメンションテーブルである必要があります。 | 結合されるテーブルは、次の要件を満たしている必要があります。一方のテーブルに処理時間属性が含まれている必要があり、もう一方のテーブルのデータは、ルックアップソースコネクタを使用して取得する必要があります。 2 つのテーブルを結合するには、等価結合条件も必要です。 | |
ラテラル結合は、テーブル値関数(TVF)を使用して生成された結果テーブルにテーブルを結合するために使用されます。左テーブルの各行は、TVF を使用して生成された結果テーブルのすべての行に結合されます。 |
|
標準結合
次の 4 つの JOIN 操作が一般的に使用されます。
INNER JOIN:2 つのテーブルの結合条件を満たすデータエントリを返します。
LEFT JOIN:左テーブルのすべてのデータエントリを返します。右テーブルの行と一致しない行も含まれます。
RIGHT JOIN:右テーブルのすべてのデータエントリを返します。左テーブルの行と一致しない行も含まれます。
FULL OUTER JOIN:2 つのテーブルの和集合を返します。2 つのテーブルのすべての行が含まれます。
標準結合 図

標準結合の例
Realtime Compute for Apache Flink コンソール にログインします。
管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
[+] アイコンをクリックし、[新しい空白のストリームドラフト] を選択します。[新しいドラフト] ダイアログで、名前を入力し、エンジンバージョンを選択して、[作成] をクリックします。
次のサンプル コードは、JOIN 操作を使用して複数のテーブルの行を結合し、スーパーヒーローのニックネームを実際の名前と関連付ける方法を示しています。
CREATE TEMPORARY TABLE NOC ( agent_id STRING, codename STRING ) WITH ( 'connector' = 'faker', -- Faker コネクタ 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- 5 つの数字から 1 つの数字を生成します。 'fields.codename.expression' = '#{superhero.name}', -- スーパーヒーロー名を生成するために使用されるフェイク ビルトイン関数です。 'number-of-rows' = '10' -- 10 行のデータが生成されることを示します。 ); CREATE TEMPORARY TABLE RealNames ( agent_id STRING, name STRING ) WITH ( 'connector' = 'faker', 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', 'fields.name.expression' = '#{Name.full_name}', -- ランダムに名前を生成するために使用されるフェイク ビルトイン関数です。 'number-of-rows' = '10' ); SELECT name, codename FROM NOC INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id; -- 2 つのテーブルの agent_id の値が同じであれば、名前とコードネームが生成されます。 agent_id の値の範囲は 1 ~ 5 です。右上隅にある [デバッグ] をクリックし、デバッグするクラスタを選択して、[OK] をクリックします。使用可能なセッションクラスタがない場合は、セッションクラスタを作成します。詳細については、「セッションクラスタを作成する」をご参照ください。

標準結合の使用方法の詳細については、「標準結合ステートメント」をご参照ください。
インターバル結合
インターバル結合は、2 つのデータセットを結合するために使用されます。各データセットは、開始時間と終了時間で定義される複数の時間間隔に基づいて分割されます。各セットのデータは、タイムスタンプに基づいて関連する間隔に割り当てられます。ほとんどの場合、インターバル結合は、特定の時間間隔内の 2 つのデータセットを比較するために使用されます。
インターバル結合の図
インターバル結合の例
この例では、時間コンテキストを含むイベントを含むテーブルを結合する方法を示します。注文が行われた時間(order_time)と出荷時間(shipment_time)の時間差が 3 時間以内であるデータがフィルタリングされます。
標準結合 セクションの手順を参照して、ETL ページでドラフトを作成します。
CREATE TEMPORARY TABLE orders (
id INT,
order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP) -- ローカル時間の 2 ~ 4 時間前のランダムなタイムスタンプを生成します。
)
WITH (
'connector' = 'datagen', -- ランダムデータを定期的に生成するために使用される Datagen コネクタ。
"rows-per-second'='10", -- ランダムデータの生成レート。1 秒あたり 10 行です。
'fields.id.kind'='sequence', -- シーケンスジェネレータ。
'fields.id.start'='1', -- シーケンス値は 1 から始まります。
'fields.id.end'='100' -- シーケンス値は 100 で終わります。
);
CREATE TEMPORARY TABLE shipments (
order_id INT,
shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP) -- ローカル時間の 0 ~ 2 時間前のランダムなタイムスタンプを生成します。
)
WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='100'
);
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff -- 注文が行われた時間(order_time)と出荷時間(shipment_time)の時間差。
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time; -- 注文が行われた時間と出荷時間の時間差が 3 時間以内であるデータをフィルタリングします。[デバッグ] をクリックし、[OK] をクリックします。次の図は、デバッグ結果を示しています。

インターバル結合の使用方法の詳細については、「インターバル結合」をご参照ください。
テンポラル結合
テンポラル結合は、一般的にテンポラルテーブル(Apache Flink では動的テーブルとも呼ばれます)を結合するために使用されます。テンポラルテーブルは、時間とともに変化するテーブルです。各レコードは、1 つ以上の期間に関連付けられています。たとえば、為替レートや製品価格は時間とともに変動する可能性があります。この場合、テンポラル結合を使用して、トランザクションが発生した時間を特定の時間の関連する為替レートまたは価格にマッピングして、正確な計算を行うことができます。
テンポラル結合の図
テンポラル結合の例
この例では、注文をその時点の有効な為替レートに基づいて計算する必要があるビジネスシナリオを示します。為替レートは、期間によって変化します。
標準結合 セクションの手順を参照して ETL ページでドラフトを作成し、デバッグのためにシミュレートされたデータを読み取ります。
CREATE TEMPORARY TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'currency_rates',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'transactions',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
c.eur_rate,
t.total,
c.currency_code,
c.rate_time,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code
;SQL エディタの右上隅にある [デバッグ] をクリックします。表示されるダイアログボックスで、[OK] をクリックします。次の図は、デバッグ結果を示しています。
為替レートは 20:16:11 と 20:35:22 に変更されました。トランザクション注文は 20:35:14 に発生し、その時点では為替レートは変更されていませんでした。したがって、計算には 20:16:11 の為替レートが使用されます。

ルックアップ結合
ルックアップ結合は、外部システムからデータを取得してデータを強化または補足するためによく使用されます。リアルタイムワークフローのデータでも、すべてのデータが頻繁に更新されるわけではありません。特定のケースでは、外部システムに保存されている静的データを使用してストリーミングデータを強化する必要がある場合があります。たとえば、製品データを Apache Flink に直接接続されているリレーショナルデータベースに保存する必要がある場合があります。Flink SQL を使用すると、ルックアップ結合を使用して参照データをルックアップし、そのデータをストリーミングデータと組み合わせることができます。JOIN 操作では、一方のテーブルに時間属性が含まれており、もう一方のテーブルが MySQL コネクタなどのコネクタを介して Apache Flink に接続されている必要があります。
ルックアップ結合の図
ディメンションテーブルに FOR SYSTEM_TIME AS OF PROCTIME() を追加する必要があります。これにより、JOIN 操作が orders テーブルで実行されるときに、orders テーブルの各行が現在の時刻のディメンションテーブルのスナップショットで表示できる行に関連付けられます。したがって、JOIN 操作の実行後にディメンションテーブルのデータが追加、更新、または削除された場合、関連付けられたデータは変更されません。
ON 条件には、ディメンションテーブルでランダムルックアップをサポートするフィールドの等価条件が含まれている必要があります。
ルックアップ結合の例
この例では、外部コネクタを使用して注文データを強化し、外部に保存されている静的データを使用して製品名を補足する方法を示します。
標準結合 セクションの手順を参照して、ETL ページでドラフトを作成します。
CREATE TEMPORARY TABLE orders (
order_id STRING,
product_id INT,
order_total INT
) WITH (
'connector' = 'faker', -- Faker コネクタ
'fields.order_id.expression' = '#{Internet.uuid}', -- ランダムな汎用一意識別子 (UUID) を生成します。
'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}', -- 1 から 5 までの数値を生成します。
'fields.order_total.expression' = '#{number.numberBetween ''1000','5000'}', -- 1000 から 5000 までの数値を生成します。
'number-of-rows' = '10' -- 生成されるデータ行数。
);
-- MySQL コネクタを使用して静的プロダクトデータを関連付けます。
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
o.order_id,
p.product_name,
o.order_total,
CASE
WHEN o.order_total > 3000 THEN 1
ELSE 0
END AS is_importance -- is_importance フィールドを追加します。order_total フィールドの値が 3000 より大きい場合、このフィールドの値は 1 となり、重要な注文であることを示します。
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p -- FOR SYSTEM_TIME AS OF PROCTIME() 句により、orders テーブルで JOIN 操作が実行される際に、orders テーブルの各行が products テーブルの結合条件に一致する行と確実に関連付けられます。
ON o.product_id = p.product_id;次の図は、MySQL コネクタを使用して関連付けられた製品データの例を示しています。

SQL エディタの右上隅にある [デバッグ] をクリックします。表示されるダイアログボックスで、[OK] をクリックします。次の図は、デバッグ結果を示しています。

ルックアップ結合の使用方法の詳細については、「ディメンションテーブルの JOIN ステートメント」をご参照ください。
ラテラル結合
ラテラル結合を使用すると、FROM 句でサブクエリを指定し、外部クエリの各行に対してこのサブクエリを実行できます。これにより、SQL クエリの柔軟性とパフォーマンスが向上し、テーブルスキャンの回数を減らすことでクエリ効率が最適化されます。ただし、内部クエリが複雑な場合、または処理されるデータ量が多い場合、LATERAL JOIN 操作によってパフォーマンスが低下する可能性があります。
ラテラル結合の例
この例では、販売注文レコードを集計し、販売レコードとその販売レコード数量で上位 3 つの製品をフィルタリングする方法を示します。
標準結合 セクションの手順を参照して、ETL ページでドラフトを作成します。
CREATE TEMPORARY TABLE sale (
sale_id STRING,
product_id INT,
sale_num INT
)
WITH (
'connector' = 'faker', -- Faker コネクタ
'fields.sale_id.expression' = '#{Internet.uuid}', -- ランダムな UUID を生成します。
'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- 5 つの数字から 1 つの数字を選択します。
'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}', -- 1 ~ 10 のランダムな整数を生成します。
'number-of-rows' = '50' -- 50 行のデータを生成します。
);
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING,
PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
p.product_name,
s.total_sales
FROM products p
LEFT JOIN LATERAL
(SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
ORDER BY total_sales DESC
LIMIT 3;SQL エディタの右上隅にある [デバッグ] をクリックします。表示されるダイアログボックスで、[OK] をクリックします。次の図は、デバッグ結果を示しています。

関連情報
イベントの発生時間ではなく、イベントの処理時間だけに興味がある場合は、処理時間テンポラル結合ステートメントを使用できます。詳細については、「処理時間テンポラル結合ステートメント」をご参照ください。
標準結合の使用方法の詳細については、「標準結合ステートメント」をご参照ください。
インターバル結合の使用方法の詳細については、「インターバル結合」をご参照ください。
ルックアップ結合の使用方法の詳細については、「ディメンションテーブルの JOIN ステートメント」をご参照ください。