すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ソーステーブルの変更

最終更新日:Jan 08, 2025

このトピックでは、ジョブの SQL ステートメントで定義されているソーステーブルを変更した後に、ジョブとジョブの開始に使用される状態データ間の互換性について説明します。

背景情報

ソーステーブルの作成に使用される DDL ステートメントで定義されているスキーマを変更する場合、システムは、テーブルを含むクエリ全体に対する変更に基づいて互換性を判断します。 WITH 句を変更する場合、システムは互換性チェック中にコネクタパラメータの変更のみを検出します。

互換性のある変更

  • 完全または部分的に互換性がある: ダウンストリームオペレーターの状態の互換性に影響を与えずにフィールドを変更します。

    -- 元の SQL ステートメント:
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- d フィールドの追加は、完全に互換性のある変更です。
    -- これは、SELECT ステートメントで d フィールドが使用されていないためです。
    create table MyTable (
      a int,
      b bigint,
      c varchar,
      d int
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- b フィールドを b as d + 1 に変更することは、部分的に互換性のある変更です。
    -- max(c) の計算結果には影響しません。列 b が変更されているため、元の sum(b) フィールドは削除されたと見なされます。
    -- 元の sum(b) フィールドの状態データは破棄されます。新しい sum(b) フィールドは追加されたと見なされ、ジョブの開始時にフィールド値が増加します。
    create table MyTable (
      a int,
      d bigint,
      c varchar,
      b as d + 1
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • 完全に互換性がある: ウォーターマーク間隔を変更します。

  • プライマリキーを変更すると、ダウンストリームオペレーターの状態の互換性に影響する可能性があります。たとえば、upsert キーを変更すると、互換性の問題が発生する可能性があります。

  • WITH 句でプライマリキーまたはシャーディングに使用される列を変更すると、ソーステーブルの状態の互換性に影響する可能性があります。たとえば、MySQL コネクタの scan.incremental.snapshot.chunk.key-column パラメータを変更すると、フルスキャンフェーズのソーステーブルの状態データが使用できなくなる可能性があります。

  • 変更ログモードの変更によってダウンストリームオペレーターに送信されるイベントのタイプが変更される可能性がある場合、ダウンストリームオペレーターの状態の互換性に影響する可能性があります。イベントタイプの例: DELETE、UPDATE_BEFORE、UPDATE_AFTER。イベントタイプを変更する可能性のある変更の例: (1) Hologres コネクタの WITH 句の cdcMode パラメータを変更します。このパラメータは、変更データキャプチャ (CDC) を使用してバイナリログを読み取るかどうかを指定します。(2) MongoDB または PostgreSQL コネクタの UPSERT モードでバイナリログを読み取るかどうかを変更します。

互換性のない変更

  • コネクタタイプを変更します。

    -- 元の SQL ステートメント:
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- 互換性なし: コネクタタイプを datagen から kafka に変更します。
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='kafka', 
      ...
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • テーブル名を変更します。

    -- 元の SQL ステートメント:
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- 互換性なし: テーブル名を MyTable から MyTable2 に変更します。
    create table MyTable2 (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable2 group by a;
  • ダウンストリームオペレーターの状態の互換性に影響するフィールドを変更します。

    -- 元の SQL ステートメント:
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint
    ) with (
      'connector'='datagen'
    );
    
    select * from MyTable1 join MyTable2 on c = d;
    
    
    -- 互換性なし: MyTable2 テーブルに e フィールドを追加します。
    -- これにより、結合操作の入力フィールドが変更されます。
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint,
      e varchar
    ) with (
      'connector'='datagen'
    );
    
    -- 元の SQL ステートメント:
    select * from MyTable1 join MyTable2 on c = d;