ウィンドウ Top-N は、ウィンドウテーブル値関数(TVF)と Top-N クエリの両方の変更ルールに同時に従う必要があります。両方のルールセットが適用されるため、ウィンドウ Top-N は単体のウィンドウ TVF や Top-N クエリよりも少ない互換性のある変更しかサポートしません。本トピックでは、ウィンドウ Top-N クエリに対する変更のうち、既存のタスク状態データと互換性があるものと、タスクを再起動する前に完全な状態リセットが必要となるものを説明します。
状態の互換性を破壊する変更の理由
ウィンドウ Top-N クエリを変更すると、Flink のクエリ オプテマイザーが異なる実行計画を生成することがあります。これにより、演算子のトポロジーや中間演算子の状態スキーマが変更される可能性があります。新しい計画が既存のセーブポイントにマップされない場合、タスクは以前の状態から再開できません。
ウィンドウ属性、GROUP BY フィールド、パーティションキー、ORDER BY フィールド、N の値、または集約フィールドを変更する操作は、再計画をトリガーし、状態の非互換性を引き起こします。
ウィンドウ Top-N の構文構造
各変更がどの句を対象としているかを理解することで、以下に示す互換性ルールを適切に適用できます。ウィンドウ Top-N クエリの構造は次のとおりです。
-- 外側のクエリ:上位ランクの行を選択
SELECT [column_list]
FROM (
-- 中間レイヤー:ROW_NUMBER() を使用して行番号を割り当てる
SELECT [column_list],
ROW_NUMBER() OVER (
PARTITION BY window_start, window_end [, partition_key...]
ORDER BY col [ASC|DESC] [, col [ASC|DESC]...]
) AS rk
FROM (
-- 内側のクエリ:ウィンドウ TVF を使用したウィンドウ集計
SELECT [agg_fields], window_start, window_end
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY [group_keys], window_start, window_end
)
)
WHERE rk < N;以下の互換性ルールは、この構造内の特定の句に対応しています。
互換性のある変更
以下の変更は、既存の状態データと完全に互換性があります。タスクの状態をリセットすることなく実施できます。
クエリ結果からウィンドウ属性フィールドを追加または削除
外側の SELECT リストから window_start または window_end を追加または削除しても、内部の状態構造には影響しません。
-- 元のクエリ:window_start のみを選択
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- 互換性あり:出力に window_end を追加
SELECT a, b, c, window_start, window_end FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;クエリ結果にランキング位置フィールドを含めるまたは除外
外側の SELECT で rk フィールド(ROW_NUMBER の出力)を含めるまたは除外しても、完全に互換性があります。
-- 元のクエリ:出力に rk は含まれない
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- 互換性あり:出力に rk を含める
SELECT a, b, c, window_start, rk FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;OVER 句内のパーティションキーの順序を変更
PARTITION BY 内のパーティションキーの順序を変更しても、パーティション化ロジックや状態構造は変わりません。
-- 元のクエリ:PARTITION BY a, b, window_start, window_end
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;
-- 互換性あり:PARTITION BY b, a, window_start, window_end に順序を変更
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, a, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;互換性のない変更
以下の変更は、既存の状態データと互換性がありません。これらの変更を実施した後は、タスクを再起動する前に状態をリセットする必要があります。
| 変更内容 | 詳細 |
|---|---|
| ウィンドウ属性を変更 | ウィンドウタイプ、ウィンドウサイズ、または時間属性を含みます。例については、「完全な非互換性を引き起こす変更」をご参照ください。 |
| GROUP BY 句のフィールドを追加・削除・変更、またはその計算ロジックを変更 | 内側の集約状態に影響します。例については、「完全な非互換性を引き起こす変更」をご参照ください。 |
| 集約フィールドを追加・削除・変更、または Top-N クエリへの入力を変更 | ROW_NUMBER レイヤーへの入力データのスキーマが変更されます。以下の例をご参照ください。 |
| パーティションキーを追加・削除・変更、またはパーティションキー フィールドの計算ロジックを変更 | 中間レイヤーの ROW_NUMBER 状態に影響します。例については、「互換性のない変更」をご参照ください。 |
| ORDER BY 句のフィールドまたは並び順を変更 | ランキングロジックと状態に影響します。例については、「互換性のない変更」をご参照ください。 |
| N の値を変更 | N は返される上位ランクの結果の件数を指定します。例については、「互換性のない変更」をご参照ください。 |
| GROUP BY 句内でウィンドウ TVF 関連フィールドのみの順序を変更 | ウィンドウ以外の GROUP BY フィールドは同じでも、window_start および window_end の順序を変更すると、ウィンドウベースのランキング結果が変わり、互換性がありません。以下の例をご参照ください。 |
| GROUP BY 句内でウィンドウ以外のフィールドのみの順序を変更 | ウィンドウ以外のフィールド(例: a と b を入れ替える)の順序を変更すると、内側の集約状態が変わり、互換性がありません。以下の例をご参照ください。 |
例:集約フィールドの追加
内側の集約に min(d) AS d を追加すると、Top-N クエリへの入力スキーマが変更され、状態の互換性が失われます。
-- 元のクエリ
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- 非互換:min(d) as d を追加し、Top-N 入力を変更
SELECT a, b, c, d, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, min(d) AS d, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;例:GROUP BY フィールドの順序変更
以下の両方の変更は、フィールドの追加や削除がなくても非互換です。
-- 元のクエリ
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;
-- 非互換:ウィンドウ TVF フィールドの順序を変更(window_end を window_start の前に)
-- これにより、ウィンドウベースのランキング結果が変化します。
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_end, window_start
)
) WHERE rk < 3;
-- 非互換:ウィンドウ以外のフィールドの順序を変更(b を a の前に)
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY b, a, window_start, window_end
)
) WHERE rk < 3;