Topik ini menjelaskan batasan dalam memodifikasi pernyataan SQL sebelum memulai job berdasarkan checkpoint atau savepoint, tidak termasuk batasan terkait query, tabel sumber, dan tabel sink.
Job yang akan dimulai harus menggunakan versi Apache Flink yang sama dengan job tempat checkpoint atau savepoint dihasilkan. Jika tidak, sistem tidak dapat menentukan kompatibilitas.
Dependensi dari job yang akan dimulai harus kompatibel dengan dependensi dari job tempat checkpoint atau savepoint dihasilkan. Jika Anda memodifikasi konektor kustom atau dependensi fungsi yang ditentukan pengguna (UDF), pastikan kompatibilitas setelah modifikasi karena sistem tidak dapat mengidentifikasi masalah kompatibilitas yang mungkin terjadi.
Jika Anda memodifikasi item tertentu sebelum pemeriksaan kompatibilitas, sistem tidak dapat menentukan kompatibilitas. Item mencakup fungsi agregat, tabel sink, atau klausa WHERE yang memengaruhi komputasi stateful.
-- Pernyataan SQL asli: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- -- Contoh perubahan yang menghasilkan kompatibilitas yang tidak diketahui: Ubah nama tabel sink menjadi MySink2 dan ubah fungsi agregat dari max(c) menjadi min(c). CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- -- Contoh perubahan yang menghasilkan kompatibilitas yang tidak diketahui: Tambahkan klausa WHERE yang berisi kondisi a > 10, atur parameter table.optimizer.state-compatibility.ignore-filter ke true, dan ubah fungsi agregat dari max(c) menjadi min(c). INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;Menambahkan query yang menghasilkan data state baru akan menyebabkan masalah kompatibilitas.
-- Pernyataan SQL asli: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; -- Menambahkan query agregat grup menghasilkan perubahan yang tidak kompatibel. INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;Menghapus tabel sink dan memodifikasi atau menghapus pernyataan TEMPORARY TABLE untuk tabel sink atau tabel sumber yang sesuai akan membuat sistem tidak dapat menentukan kompatibilitas sebelum job dimulai. Namun, jika Anda menghapus tabel sink tanpa memodifikasi pernyataan TEMPORARY TABLE yang sesuai, kompatibilitas tidak terpengaruh.
-- Pernyataan SQL asli -- Tabel sumber 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Tabel sumber 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Tabel sink 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- Tabel sink 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; -- Contoh perubahan yang menghasilkan kompatibilitas yang tidak diketahui: Hapus tabel sink dan modifikasi atau hapus pernyataan TEMPORARY TABLE untuk tabel sink atau tabel sumber yang sesuai. -- Tabel sumber 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Tabel sink 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; -- Contoh perubahan yang sepenuhnya kompatibel: Hapus tabel sink tanpa memodifikasi pernyataan TEMPORARY TABLE yang sesuai. -- Tabel sumber 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Tabel sumber 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Tabel sink 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- Tabel sink 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;