すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:シンク コネクタの変更

最終更新日:Jan 21, 2025

このトピックでは、デプロイメントの SQL 文でシンク コネクタを変更した後、デプロイメントと状態データ間の互換性がどのように影響を受けるかについて説明します。

互換性に影響を与えない、または部分的に影響を与える変更

  • 同じソース コネクタに接続されているシンク コネクタの 1 つを削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。

    -- 元の SQL 文:
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- MySink1 の SELECT 文を削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
    -- SELECT 文の集計関数に対応する状態データは破棄されます。
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- MySink2 の SELECT 文を削除します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • シンク コネクタとステートレス クエリを追加します。この変更後、デプロイメントは状態データと完全に互換性を維持します。

    -- 元の SQL 文:
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- ステートレス クエリを追加します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
  • ほとんどのシンク コネクタはステートレスです。したがって、シンクはデフォルトでステートレス オペレーターと見なされます。デプロイメントの WITH 句のテーブル名、コネクタ タイプ、および属性が変更された場合、デプロイメントは状態データと完全に互換性を維持します。

    -- 元の SQL 文:
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- デプロイメントの SQL 文で、テーブル名やコネクタ タイプなどの構成を変更します。この変更後、デプロイメントは状態データと完全に互換性を維持します。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'kafka',
      ...
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

非互換性をもたらす変更

  • シンク コネクタとステートフル クエリを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL 文:
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- ステートフル クエリを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    CREATE TABLE MySink2 (
      b bigint,
      a int,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
  • シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定し、デプロイメントのテーブル名またはコネクタ タイプを変更します。この変更後、デプロイメントは状態データと互換性がなくなります。 table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。

    -- 元の SQL 文:
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定します。
    -- table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。テーブル名を変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定します。
    -- table.optimizer.state-compatibility.ignore-sink パラメーターを false に設定すると、シンクはステートフル オペレーターと見なされます。コネクタ タイプを変更します。たとえば、コネクタ タイプを print から blackhole に変更できます。この変更後、デプロイメントは状態データと互換性がなくなります。
    create table MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'blackhole',
      ...
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • デフォルトでは、シンクの table.optimizer.state-compatibility.ignore-sink パラメーターは true に設定されており、互換性チェック中にシンクが無視されます。したがって、シンクはほとんどの場合、ステートレス オペレーターと見なされます。順序が正しくないデータが存在する場合、順序が正しくないデータを処理し、データの正確性を確保するために、シンクに基づいてステートフル SinkMaterializer オペレーターが生成されます。詳細については、「Flink SQL で順序が正しくない changelog イベントを処理する」をご参照ください。この場合、デプロイメントの開始時に実行した互換性チェックの結果が 完全互換 である場合、変更後にデプロイメントが状態データと互換性がなくなる可能性があります。たとえば、ソースのプライマリ キーを変更すると、シンクのアップストリーム upsert キーが変更されます。この場合、前述の互換性の問題が発生する可能性があります。

    -- 元のクエリ
    CREATE TEMPORARY TABLE MyTable (
      a int primary key not enforced,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint  primary key not enforced) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    -- プライマリ キーとして使用される列を変更します。
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint primary key not enforced,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
  • シンクの table.optimizer.state-compatibility.ignore-sink パラメーターを true から false に設定します。シンクはステートレス オペレーターからステートフル オペレーターに変更され、互換性チェックに含まれます。この変更後、デプロイメントは状態データと互換性がなくなります。

不明な互換性をもたらす変更

シンクを削除し、デプロイメントのシンクまたはソースの TEMPORARY TABLE 文を変更または削除した場合、デプロイメントと状態データ間の互換性は判断されません。詳細については、「その他の制限」をご参照ください。