全部产品
Search
文档中心

Realtime Compute for Apache Flink:Ubah konektor sink

更新时间:Jun 19, 2025

Topik ini menjelaskan bagaimana kompatibilitas antara deployment dan data state terpengaruh setelah Anda mengubah konektor sink dalam pernyataan SQL untuk deployment tersebut.

Perubahan yang tidak memengaruhi atau sebagian memengaruhi kompatibilitas

  • Menghapus salah satu konektor sink yang terhubung ke konektor sumber yang sama. Deployment tetap sepenuhnya kompatibel dengan data state setelah perubahan ini.

    -- Pernyataan SQL asli: 
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- Hapus pernyataan SELECT untuk MySink1. Setelah perubahan ini, deployment tetap sepenuhnya kompatibel dengan data state.
    -- Data state yang sesuai dengan fungsi agregat dalam pernyataan SELECT dibuang.
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- Hapus pernyataan SELECT untuk MySink2. Setelah perubahan ini, deployment tetap sepenuhnya kompatibel dengan data state.
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • Menambahkan konektor sink dan query tanpa state. Deployment tetap sepenuhnya kompatibel dengan data state setelah perubahan ini.

    -- Pernyataan SQL asli: 
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- Tambahkan query tanpa state. Setelah perubahan ini, deployment tetap sepenuhnya kompatibel dengan data state.
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
  • Sebagian besar konektor sink bersifat tanpa state, sehingga sink dianggap sebagai operator tanpa state secara default. Jika nama tabel, tipe konektor, dan atribut dalam klausa WITH diubah, deployment tetap sepenuhnya kompatibel dengan data state.

    -- Pernyataan SQL asli: 
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- Ubah konfigurasi seperti nama tabel dan tipe konektor dalam pernyataan SQL untuk deployment. Setelah perubahan ini, deployment tetap sepenuhnya kompatibel dengan data state.
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'kafka',
      ...
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

Perubahan yang menyebabkan ketidaksesuaian

  • Menambahkan konektor sink dan query berstate. Deployment menjadi tidak kompatibel dengan data state setelah perubahan ini.

    -- Pernyataan SQL asli: 
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- Tambahkan query berstate. Setelah perubahan ini, deployment menjadi tidak kompatibel dengan data state.
    CREATE TABLE MySink2 (
      b bigint,
      a int,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
  • Mengatur parameter table.optimizer.state-compatibility.ignore-sink untuk sink ke false dan mengubah nama tabel atau tipe konektor untuk deployment. Deployment menjadi tidak kompatibel dengan data state setelah perubahan ini. Ketika parameter table.optimizer.state-compatibility.ignore-sink diatur ke false, sink dianggap sebagai operator berstate.

    -- Pernyataan SQL asli: 
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- Atur parameter table.optimizer.state-compatibility.ignore-sink untuk sink ke false.
    -- Setelah Anda mengatur parameter table.optimizer.state-compatibility.ignore-sink ke false, sink dianggap sebagai operator berstate. Ubah nama tabel. Setelah perubahan ini, deployment menjadi tidak kompatibel dengan data state.
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- Atur parameter table.optimizer.state-compatibility.ignore-sink untuk sink ke false.
    -- Setelah Anda mengatur parameter table.optimizer.state-compatibility.ignore-sink ke false, sink dianggap sebagai operator berstate. Ubah tipe konektor. Misalnya, Anda dapat mengubah tipe konektor dari print menjadi blackhole. Setelah perubahan ini, deployment menjadi tidak kompatibel dengan data state.
    create table MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'blackhole',
      ...
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • Secara default, parameter table.optimizer.state-compatibility.ignore-sink untuk sink diatur ke true untuk mengabaikan sink selama pemeriksaan kompatibilitas. Oleh karena itu, sink dianggap sebagai operator tanpa state dalam banyak kasus. Namun, jika ada data yang tidak berurutan, operator SinkMaterializer berstate dihasilkan berdasarkan sink untuk menangani data yang tidak berurutan dan memastikan akurasi data. Untuk informasi lebih lanjut, lihat Menangani peristiwa changelog yang tidak berurutan dalam Flink SQL. Dalam hal ini, jika hasil pemeriksaan kompatibilitas saat memulai deployment adalah Full Compatible, deployment mungkin menjadi tidak kompatibel dengan data state setelah perubahan. Sebagai contoh, jika Anda mengubah kunci utama dari sumber, kunci upsert upstream dari sink berubah, yang dapat menyebabkan masalah kompatibilitas seperti yang disebutkan di atas.

    -- Query asli
    CREATE TEMPORARY TABLE MyTable (
      a int primary key not enforced,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint  primary key not enforced) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    -- Ubah kolom yang digunakan sebagai kunci utama.
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint primary key not enforced,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
  • Mengubah parameter table.optimizer.state-compatibility.ignore-sink dari true ke false. Sink diubah dari operator tanpa state menjadi operator berstate dan dilibatkan dalam pemeriksaan kompatibilitas. Deployment menjadi tidak kompatibel dengan data state setelah perubahan ini.

Perubahan yang menyebabkan kompatibilitas tidak diketahui

Jika Anda menghapus sink dan memodifikasi atau menghapus pernyataan TEMPORARY TABLE untuk sink atau sumber dalam deployment, kompatibilitas antara deployment dan data state tidak ditentukan. Untuk informasi lebih lanjut, lihat Batasan lainnya.