このトピックでは、チェックポイントまたはセーブポイントに基づいてジョブを開始する前に、ジョブの SQL ステートメントを変更する場合の制限について説明します。ジョブのクエリ、ソーステーブル、およびシンクテーブルの変更に関する制限は除きます。
開始するジョブは、チェックポイントまたはセーブポイントが生成されたジョブと同じ Apache Flink バージョンを使用する必要があります。そうでない場合、システムは互換性を判断できません。
開始するジョブの依存関係は、チェックポイントまたはセーブポイントが生成されたジョブの依存関係と互換性がある必要があります。カスタムコネクタまたはユーザー定義関数(UDF)の依存関係を変更する場合は、変更後に互換性を確保する必要があります。システムはこの場合に発生する可能性のある互換性の問題を特定できないためです。
互換性チェックの前に複数の項目を変更すると、システムは互換性を判断できません。項目とは、ステートフルな計算に影響する集計関数、シンクテーブル、または WHERE 句のことです。
-- 元の SQL ステートメント: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- -- 互換性が不明になる変更の例: シンクテーブルの名前を MySink2 に変更し、集計関数を max(c) から min(c) に変更します。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- -- 互換性が不明になる変更の例: a > 10 条件を含む WHERE 句を追加し、table.optimizer.state-compatibility.ignore-filter パラメーターを true に設定し、集計関数を max(c) から min(c) に変更します。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;新しい状態データを生成するクエリを追加すると、互換性の問題が発生します。
-- 元の SQL ステートメント: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; -- グループ集計クエリを追加すると、互換性のない変更になります。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;シンクテーブルを削除し、シンクテーブルまたは対応するソーステーブルの TEMPORARY TABLE ステートメントを変更または削除すると、ジョブを開始する前にシステムは互換性を判断できません。対応する TEMPORARY TABLE ステートメントを変更せずにシンクテーブルを削除した場合、互換性には影響しません。
-- 元の SQL ステートメント -- ソーステーブル 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- ソーステーブル 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- シンクテーブル 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- シンクテーブル 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --クエリ BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; -- 互換性が不明になる変更の例: シンクテーブルを削除し、シンクテーブルまたは対応するソーステーブルの TEMPORARY TABLE ステートメントを変更または削除します。 -- ソーステーブル 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- シンクテーブル 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --クエリ INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; -- 完全な互換性のある変更の例: 対応する TEMPORARY TABLE ステートメントを変更せずにシンクテーブルを削除します。 -- ソーステーブル 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- ソーステーブル 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- シンクテーブル 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- シンクテーブル 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --クエリ INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;