実行中の Realtime Compute for Apache Flink デプロイメントの SQL ステートメントを変更すると、Flink は既存の状態データが再利用可能かどうかを評価します。状態互換性を保持する変更を理解することで、予期しないデータ損失や完全な再処理を回避できます。
このトピックでは、GROUP BY とともにウィンドウ集計関数を使用する SQL ステートメントの状態互換性の結果について説明します。
変更の結果は、次の 3 つのいずれかになります。
完全互換 — デプロイメントはすべての既存の状態データを保持したまま継続します。
部分互換 — 変更されていない集計は既存の状態から継続し、新しく追加された集計はゼロから再開します。
非互換 — デプロイメントは状態データを再利用できず、最初から再開する必要があります。
Flink による状態互換性の評価方法
Flink は、SQL ステートメントのどのコンポーネントが変更されるかに基づいて、状態の再利用を評価します。
| コンポーネント | 柔軟性 | 例 |
|---|---|---|
| 非 DISTINCT 集計 | 柔軟 — 部分互換性が可能 | SUM(b)、MAX(c)、COUNT(c) |
| ウィンドウ属性フィールド | 柔軟 — 変更は完全互換 | tumble_start、tumble_end |
| GROUP BY 内のウィンドウ関数の位置 | 柔軟 — ウィンドウキーの並べ替えは完全互換 | GROUP BY 内の TUMBLE(ts, ...) |
| ウィンドウ定義 | 固定 — 変更すると互換性が失われる | ウィンドウタイプ、ウィンドウサイズ、時間属性 |
| GROUP BY キー (統計ディメンション) | 固定 — 変更すると互換性が失われる | GROUP BY a, d, ... |
| DISTINCT 集計 | 固定 — 変更すると互換性が失われる | COUNT(DISTINCT c) |
| 早期発火および遅延発火設定 | 固定 — 変更すると互換性が失われる | table.exec.emit.early-fire.enabled |
このフレームワークは、一部の変更が部分的に対応できる理由 (Flink は完全な状態を再構築することなく個々の集計を破棄または追加できる理由) と、他の変更が完全な状態の再起動を必要とする理由 (計算の基本的な構造が変更されるため) を説明します。
クイックリファレンス
この表を使用して、変更を適用する前に特定の変更の互換性への影響を判断してください。
| 変更 | 互換性 |
|---|---|
| 非 DISTINCT 集計の追加 | 部分 |
| 非 DISTINCT 集計の削除 | フル |
| 1 回の変更で非 DISTINCT 集計を追加および削除 | 部分的 |
| 非 DISTINCT 集計の変更 | 部分的 |
| 非 DISTINCT 集計の並べ替え | フル |
| 非 DISTINCT 集計内のフィールドの計算ロジックの変更 | 部分的 |
ウィンドウ属性フィールド (tumble_start、tumble_end) の追加または削除 | フル |
| GROUP BY キーの並べ替え — ウィンドウ関数キーの位置のみ変更 | 完全 |
| 変更前後に集計がない | フル |
| ウィンドウタイプの変更 (例: TUMBLE から HOP へ) | 非互換 |
| ウィンドウサイズの変更 | 非互換 |
時間属性の変更 (rowtime から proctime へ、またはその逆) | 非互換 |
| GROUP BY キー (統計ディメンション) の追加、削除、または変更 | 非互換 |
| DISTINCT 集計の追加、削除、または変更 | 非互換 |
| すべての集計の削除 | 非互換 |
| 早期発火または遅延発火の追加または削除 | 非互換 |
| 非ウィンドウ GROUP BY キーの並べ替え | 非互換 |
| 集計がなかったデプロイメントへの集計の追加 | 非互換 |
| 集計が 1 つのみ存在し、その計算ロジックが変更される | 非互換 |
| 集計のセットが完全に変更される (重複なし) | 非互換 |
変更前後に SQL に 'table.exec.emit.early-fire.enabled' = 'true' または 'table.exec.emit.late-fire.enabled' = 'true' が含まれる | 非互換 |
| 変更前後に SQL に Python ユーザー定義集計関数 (UDAF) が含まれる | 不明 |
状態を保持または部分的に保持する変更
非 DISTINCT 集計の追加、削除、または変更
以下の変更は、部分的な互換性または完全な互換性をもたらします。
追加:部分互換。新しい集計は、デプロイメントが再起動するとゼロからカウントを開始します。
削除:完全互換。削除された集計の状態データは破棄され、他のすべての集計は影響を受けません。
1 回の変更で追加および削除:部分互換。追加された集計はゼロから開始し、削除された集計の状態は破棄されます。
変更 (ある集計を別の集計に変更する):部分互換。元の集計は削除されたものとして扱われ、その状態は破棄されます。新しい集計は追加されたものとして扱われ、ゼロから開始します。
変更しない集計の場合、状態再利用後の結果は、完全な既存データから計算された結果と同一です。
-- 元の SQL ステートメント
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- COUNT(c) を追加:部分互換。
-- SUM(b) と MAX(c) は影響を受けません。COUNT(c) は 0 から開始します。
SELECT a, SUM(b), MAX(c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- SUM(b) を削除:完全互換。
-- MAX(c) は影響を受けません。
SELECT a, MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- MAX(c) を MIN(c) に変更:部分互換。
-- SUM(b) は影響を受けません。MAX(c) の状態は破棄され、MIN(c) は 0 から開始します。
SELECT a, SUM(b), MIN(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);非 DISTINCT 集計の並べ替え
SELECT リスト内の集計の並べ替えは互換性に影響しません。すべての結果は正しいままです。
-- 元の SQL ステートメント
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- SUM(b) と MAX(c) を入れ替え:完全互換。
SELECT a, MAX(c), SUM(b)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);非 DISTINCT 集計内のフィールドの計算ロジックの変更
入力フィールドの計算方法を変更することは、集計の変更とみなされます。元の集計は破棄され、新しい集計は最初から開始します。
-- 元の SQL ステートメント
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- MAX(c) を MAX(SUBSTRING(c, 1, 5)) に変更:部分互換。
-- SUM(b) は影響を受けません。MAX(c) の状態は破棄され、MAX(SUBSTRING(c, 1, 5)) は 0 から開始します。
SELECT a, SUM(b), MAX(c)
FROM (SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable)
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);ウィンドウ属性フィールドの追加または削除
tumble_start、tumble_end、または類似のウィンドウ属性フィールドの追加または削除は完全互換です。
-- 元の SQL ステートメント
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- window_end を追加:完全互換。
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start,
tumble_end(ts, INTERVAL '1' MINUTE) AS window_end
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- window_start を削除:完全互換。
SELECT a,
SUM(b),
MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);GROUP BY キーの並べ替え — ウィンドウ関数キーのみ
GROUP BY 句内のウィンドウ関数キーの位置のみが変更され、他のすべてのキーが同じ順序のままであれば、互換性は保持されます。
-- 元の SQL ステートメント
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- ウィンドウ関数キーを移動:完全互換。
-- a と b の位置は変更されません。
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE), b;状態互換性を損なう変更
以下の変更は、デプロイメントを既存の状態データと非互換にします。デプロイメントはすべての状態を破棄し、入力の最初から結果を再計算する必要があります。
ウィンドウ属性の変更
ウィンドウタイプ、ウィンドウサイズ、または時間属性の変更は互換性を損ないます。
-- 元の SQL ステートメント
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- ウィンドウタイプを TUMBLE から HOP に変更:非互換。
SELECT a,
SUM(b),
MAX(c),
hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, HOP(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);
-- ウィンドウサイズを 1 分から 2 分に変更:非互換。
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '2' MINUTE);
-- 時間属性を ts (rowtime) から proctime に変更:非互換。
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(proctime, INTERVAL '1' MINUTE);GROUP BY キーの追加、削除、または変更
GROUP BY キーは統計ディメンションを定義します。これらのキーへの変更 (キーとして使用されるフィールドの計算ロジックの変更を含む) は互換性を損ないます。
-- 元の SQL ステートメント
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- GROUP BY キー d を追加:非互換。
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, d, TUMBLE(ts, INTERVAL '1' MINUTE);DISTINCT 集計の追加、削除、または変更
DISTINCT を使用する集計への変更は、他のすべての集計が変更されないままであっても、互換性を損ないます。
-- 元の SQL ステートメント
SELECT a,
SUM(b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- COUNT(DISTINCT b) を追加:非互換。
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);すべての集計の削除
SQL ステートメントからすべての集計を削除すると互換性が損なわれます。すべての状態は破棄されます。
-- 元の SQL ステートメント
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- すべての集計を削除:非互換。
SELECT a,
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);早期発火または遅延発火の追加または削除
early-fire または late-fire の出力設定への変更は互換性を損ないます。
非ウィンドウ GROUP BY キーの並べ替え
ウィンドウ関数キー以外の GROUP BY キーの順序を変更すると互換性が損なわれます。
-- 元の SQL ステートメント
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- a と b を入れ替え:非互換。
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY b, a, TUMBLE(rowtime, INTERVAL '15' MINUTE);集計がないデプロイメントへの集計の追加
元の SQL に集計がなく、集計を追加すると、互換性が損なわれます。
-- 元の SQL ステートメント (集計なし)
SELECT a, b, c
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- COUNT(c) を追加:非互換。
SELECT a, b, c, COUNT(c)
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);集計が 1 つのみ存在し、その計算ロジックが変更される
デプロイメントに集計が 1 つだけあり、その計算方法を変更すると、互換性が損なわれます。
-- 元の SQL ステートメント
SELECT a, SUM(b), MAX(b), MAX(c)
FROM MyTable
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- 残りの唯一の集計を変更:非互換。
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);変更前後の統計メトリックが完全に異なる
変更前の集計が変更後に残っていない場合、互換性は損なわれます。
-- 元の SQL ステートメント
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);
-- 完全に異なる集計セットに置き換え:非互換。
SELECT a, MIN(b), AVG(b) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);SQL に早期発火または遅延発火の構成が含まれる
変更前後に SQL ステートメントに 'table.exec.emit.early-fire.enabled' = 'true' または 'table.exec.emit.late-fire.enabled' = 'true' が含まれる場合、互換性は損なわれます。
-- 元の SQL ステートメント
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- 早期発火または遅延発火の構成を追加:非互換。
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '500ms';
-- または
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '1s';
SET 'table.exec.emit.allow-lateness' = '5s';
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);SQL に Python UDAF が含まれる
変更前後に SQL ステートメントに Python ユーザー定義集計関数 (UDAF) が含まれる場合、デプロイメントとその状態データ間の互換性は不明です。
-- 変更前後に Python UDAF (weighted_avg) を含む SQL — 互換性は不明。
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b), weighted_avg(a, b)
FROM MyTable
GROUP BY a, c;