このトピックでは、デプロイメントの SQL 文でシンク コネクタを変更した後、デプロイメントと状態データ間の互換性がどのように影響を受けるかについて説明します。
互換性に影響を与えない、または部分的に影響を与える変更
同じソース コネクタに接続されているシンク コネクタの 1 つを削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
-- 元の SQL 文: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- MySink1 の SELECT 文を削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。 -- SELECT 文の集計関数に対応する状態データは破棄されます。 INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- MySink2 の SELECT 文を削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。 INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;シンク コネクタとステートレス クエリを追加します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
-- 元の SQL 文: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- ステートレス クエリを追加します。この変更後、デプロイメントは状態データと完全に互換性を維持します。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;ほとんどのシンク コネクタはステートレスです。したがって、シンクはデフォルトでステートレス オペレーターと見なされます。デプロイメントの WITH 句のテーブル名、コネクタ タイプ、および属性が変更された場合、デプロイメントは状態データと完全に互換性を維持します。
-- 元の SQL 文: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- デプロイメントの SQL 文で、テーブル名やコネクタ タイプなどの構成を変更します。この変更後、デプロイメントは状態データと完全に互換性を維持します。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
非互換性をもたらす変更
シンク コネクタとステートフル クエリを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
-- 元の SQL 文: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- ステートフル クエリを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。 CREATE TABLE MySink2 ( b bigint, a int, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定し、デプロイメントのテーブル名またはコネクタ タイプを変更します。この変更後、デプロイメントは状態データと互換性がなくなります。 table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。
-- 元の SQL 文: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定します。 -- table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。テーブル名を変更します。この変更後、デプロイメントは状態データと互換性がなくなります。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定します。 -- table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。コネクタ タイプを変更します。たとえば、コネクタ タイプを print から blackhole に変更できます。この変更後、デプロイメントは状態データと互換性がなくなります。 create table MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'blackhole', ... ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;デフォルトでは、シンクの table.optimizer.state-compatibility.ignore-sink パラメーターは true に設定されており、互換性チェック中にシンクが無視されます。したがって、シンクはほとんどの場合、ステートレス オペレーターと見なされます。順序が正しくないデータが存在する場合、順序が正しくないデータを処理し、データの正確性を確保するために、シンクに基づいてステートフル SinkMaterializer オペレーターが生成されます。詳細については、「Flink SQL で順序が正しくない changelog イベントを処理する」をご参照ください。この場合、デプロイメントの開始時に実行した互換性チェックの結果が 完全互換 である場合、変更後にデプロイメントが状態データと互換性がなくなる可能性があります。たとえば、ソースのプライマリ キーを変更すると、シンクのアップストリーム upsert キーが変更されます。この場合、前述の互換性の問題が発生する可能性があります。
-- 元のクエリ CREATE TEMPORARY TABLE MyTable ( a int primary key not enforced, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint primary key not enforced) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable; -- プライマリ キーとして使用される列を変更します。 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint primary key not enforced, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable;シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを true から false に設定します。シンクはステートレス オペレーターからステートフル オペレーターに変更され、互換性チェックに含まれます。この変更後、デプロイメントは状態データと互換性がなくなります。
不明な互換性をもたらす変更
シンクを削除し、デプロイメントのシンクまたはソースの TEMPORARY TABLE 文を変更または削除した場合、デプロイメントと状態データ間の互換性は判断されません。詳細については、「その他の制限」をご参照ください。