Topik ini menjelaskan beberapa parameter utama yang mungkin perlu dikonfigurasi saat mengembangkan draf SQL. Topik ini juga memberikan contoh cara mengonfigurasi parameter tersebut.
table.exec.sink.keyed-shuffle
Parameter table.exec.sink.keyed-shuffle menyelesaikan ketidakteraturan data yang ditulis ke tabel dengan kunci utama. Mengaktifkannya memungkinkan pekerjaan melakukan hash shuffling, memastikan data dengan kunci utama yang sama diarahkan ke tugas operator yang sama, sehingga mengurangi kemungkinan masalah ketidakteraturan.
Catatan Penggunaan
Hash shuffling hanya membantu jika operator hulu dapat memastikan urutan valid dari catatan pembaruan di bidang kunci utama.
Jika Anda mengubah paralelisme operator untuk pekerjaan yang berjalan dalam mode ahli, aturan paralelisme berikut tidak berlaku.
Nilai Valid
AUTO(default): Jika paralelisme operator sink bukan 1 dan berbeda dari paralelisme operator hulu, Realtime Compute for Apache Flink secara otomatis melakukan hash shuffling pada bidang kunci utama saat data mengalir ke operator sink.FORCE: Jika paralelisme operator sink bukan 1, Realtime Compute for Apache Flink secara paksa melakukan hash shuffling pada bidang kunci utama saat data mengalir ke operator sink.NONE: Realtime Compute for Apache Flink tidak melakukan hash shuffling berdasarkan paralelisme operator sink dan paralelisme operator hulu.
Contoh
AUTO
Salin dan tempel kode ke draf streaming SQL, lalu terapkan draf tersebut. Secara eksplisit atur paralelisme sink menjadi 2:
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', -- Atur paralelisme operator sink menjadi 2. 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; -- Anda juga dapat mengonfigurasi opsi tabel dinamis untuk menentukan paralelisme operator sink. --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;Aktifkan hash shuffling otomatis.
Buka halaman detail pekerjaan.
Di tab Configuration, temukan bagian Resources, lalu klik Edit.
Atur Parallelism menjadi 1.

Di bagian Parameters, klik Edit.
Di bidang Other Configuration, secara eksplisit atur
table.exec.sink.keyed-shuffle: AUTO. Sebagai alternatif, biarkan bidang kosong.
Mulai pekerjaan.
Di tab Status, mode koneksi data antara operator sink dan operator hulu adalah HASH.

FORCE
Salin dan tempel kode ke draf streaming SQL, lalu terapkan draf tersebut. Jangan secara eksplisit mengatur paralelisme sink.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;Aktifkan hash shuffling
FORCE.Buka halaman detail pekerjaan.
Di tab Configuration, temukan bagian Resources, klik Edit, lalu atur Parallelism menjadi 2.
Di bagian Parameters, klik Edit.
Di bidang Other Configuration, tambahkan
table.exec.sink.keyed-shuffle: FORCE.
Mulai pekerjaan.
Di tab Status, baik paralelisme operator sink maupun operator hulu adalah 2, dan mode koneksi data adalah HASH.

table.exec.mini-batch.size
Parameter ini menentukan jumlah maksimum catatan input yang dapat disimpan di operator komputasi untuk operasi mikro-batch. Jika jumlah catatan data yang disimpan mencapai ukuran MiniBatch, perhitungan dan keluaran data dipicu. Parameter ini hanya berlaku jika digunakan bersama dengan parameter table.exec.mini-batch.enabled dan table.exec.mini-batch.allow-latency. Untuk informasi lebih lanjut tentang optimasi terkait miniBatch, lihat MiniBatch Aggregation dan MiniBatch Regular Joins.
Catatan Penggunaan
Jika Anda tidak mengonfigurasi parameter ini secara eksplisit di bagian Parameters sebelum pekerjaan dimulai, memori terkelola digunakan untuk menyimpan data dalam mode pemrosesan miniBatch. Jika salah satu dari kondisi berikut terpenuhi, perhitungan akhir dan keluaran data dipicu:
Operator komputasi menerima pesan watermark yang dikirim oleh operator
MiniBatchAssigner.Memori terkelola penuh.
Sebelum checkpointing.
Pekerjaan dibatalkan.
Nilai Valid
-1 (default): Memori terkelola digunakan untuk menyimpan data.
Nilai negatif dari tipe LONG: Mekanisme pemrosesan sama dengan mekanisme pemrosesan untuk nilai default.
Nilai positif dari tipe LONG: Memori heap digunakan untuk menyimpan data. Saat jumlah catatan input yang disimpan mencapai nilai yang ditentukan, keluaran data dipicu.
Contoh
Buat draf streaming SQL, salin pernyataan SQL uji berikut, lalu terapkan draf tersebut.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;Konfigurasikan ukuran MiniBatch.
Pergi ke halaman detail pekerjaan.
Di tab Configuration, temukan bagian Parameters, dan klik Edit.
Di Other Configuration, atur
table.exec.mini-batch.enabled: truedantable.exec.mini-batch.allow-latency: 2s.Atur
table.exec.mini-batch.sizemenjadi-1atau lewati parameter ini.
Mulai pekerjaan.
Di tab Status, topologi pekerjaan berisi operator
MiniBatchAssigner,LocalGroupAggregate, danGlobalGroupAggregate.
table.exec.agg.mini-batch.output-identical-enabled
Parameter ini menentukan apakah data duplikat akan dikirim ke operator hilir jika state TTL diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operator MinibatchGlobalAgg dan MinibatchAgg. Secara default, data duplikat tidak dikirim. Namun, perilaku ini dapat menyebabkan state operator hilir kedaluwarsa karena tidak ada data yang diterima selama periode waktu tertentu. Anda dapat mengatur parameter ini ke true untuk operator MinibatchGlobalAgg dan MinibatchAgg. Jika periode perubahan hasil agregasi pekerjaan Anda lebih kecil dari TTL state yang ditentukan, lewati pengaturan ini. Untuk informasi lebih lanjut, lihat FLINK-33936.
Catatan Penggunaan
Saklar ini hanya berlaku di Ververica Runtime (VVR) 8.0.8 atau versi lebih baru.
Jika Anda mengubah nilai dari
falsemenjaditrue, jumlah data yang dikirim oleh operatorMinibatchGlobalAggdanMinibatchAggmungkin meningkat. Hal ini dapat memberikan tekanan tambahan pada operator hilir.
Nilai Valid
false(default): Jika TTL data state diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operatorMinibatchGlobalAggdanMinibatchAgg, data duplikat tidak dikirim ke operator hilir.true: Jika TTL data state diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operatorMinibatchGlobalAggdanMinibatchAgg, data duplikat dikirim ke operator hilir.
Contoh
Buat draf streaming SQL, salin pernyataan SQL uji berikut, lalu terapkan draf tersebut.
create temporary table src( a int, b string ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.a.min' = '1', 'fields.a.max' = '1', 'fields.b.length' = '3' ); create temporary table snk( a int, max_length_b bigint ) with ( 'connector' = 'blackhole' ); insert into snk select a, max(CHAR_LENGTH(b)) from src group by a;Aktifkan optimasi agregat MiniBatch.
Pergi ke halaman detail pekerjaan.
Di tab Configuration, temukan bagian Parameters, dan klik Edit.
Di Other Configuration, tambahkan
table.exec.mini-batch.enabled: truedantable.exec.mini-batch.allow-latency: 2s.
Mulai pekerjaan.
Di tab Status, Anda dapat melihat operator
MinibatchGlobalAggregate. Klik + pada operator untuk memverifikasi bahwa operatorGlobalGroupAggregatebelum mengirimkan data ke operator hilir jika hasil agregasi tetap tidak berubah.
Hentikan pekerjaan. Di tab Configuration, temukan bagian Parameters, dan klik Edit. Di Other Configuration, tambahkan
table.exec.agg.mini-batch.output-identical-enabled: true.Mulai pekerjaan.
Di tab Status, Anda dapat melihat operator
MinibatchGlobalAggregate. Klik + pada operator untuk memeriksa bahwa operatorGlobalGroupAggregatemengirimkan data ke operator hilir jika hasil agregasi tetap tidak berubah.
table.exec.async-lookup.key-ordered-enabled
Dalam kasus penggunaan yang melibatkan lookup joins, Anda dapat mengaktifkan mode asinkron untuk meningkatkan throughput. Tabel berikut menggambarkan urutan data operasi I/O asinkron berdasarkan pengaturan parameter table.exec.async-lookup.output-mode dalam lookup joins dan apakah aliran masukan merupakan aliran pembaruan.
| Aliran Pembaruan | Aliran Non-Pembaruan |
ORDERED | Mode Terurut | Mode Terurut |
ALLOW_UNORDERED | Mode Terurut | Mode Tidak Terurut |
Jika table.exec.async-lookup.output-mode diatur ke ALLOW_UNORDERED untuk aliran pembaruan, kebenaran data dijamin melalui mode terurut tetapi throughput berkurang. Untuk menyelesaikan masalah ini, parameter table.exec.async-lookup.key-ordered-enabled diperkenalkan untuk memastikan kebenaran data aliran pembaruan dan throughput operasi I/O asinkron. Pesan dengan kunci pembaruan yang sama (dianggap sebagai kunci utama log perubahan) dalam aliran diproses berdasarkan urutan pesan masuk ke operator.
Mode Terurut: Dalam mode ini, urutan aliran tetap tidak berubah. Urutan pengiriman pesan hasil sama dengan urutan permintaan asinkron dipicu (urutan pesan masuk ke operator).
Mode Tidak Terurut: Dalam mode ini, pesan hasil dikirim segera setelah permintaan asinkron selesai. Urutan pesan dalam aliran berubah setelah pesan diproses oleh operator I/O asinkron. Untuk informasi lebih lanjut, lihat Async I/O | Apache Flink.
Skenario
Jumlah pesan dengan kunci pembaruan yang sama dalam aliran kecil selama periode waktu tertentu. Misalnya, kunci pembaruan dianggap sebagai kunci utama dan data dengan kunci utama yang sama tidak sering diperbarui. Selain itu, urutan pemrosesan berdasarkan kunci pembaruan diperlukan saat tabel dimensi digabungkan. Dalam hal ini, Anda dapat melakukan optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled. Ini dapat memastikan urutan pemrosesan data berdasarkan kunci pembaruan.
Dalam aliran Change Data Capture (CDC) yang berisi kunci utama, tabel dimensi digabungkan untuk menghasilkan tabel lebar untuk penulisan data ke sink. Kunci utama sink konsisten dengan kunci utama sumber. Selain itu, kunci gabungan operasi JOIN pada tabel dimensi tidak konsisten dengan kunci utama. Kunci gabungan dianggap sebagai kunci utama. Dalam hal ini, Anda dapat melakukan optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled. Ini memungkinkan sistem untuk mengacak data berdasarkan kunci utama CDC, yang dianggap sebagai kunci pembaruan. Anda juga dapat mengaktifkan kebijakan join SHUFFLE_HASH untuk mengoptimalkan skenario ini. Dalam skenario konkurensi tinggi, dibandingkan dengan metode ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled dapat mencegah pembuatan operator SinkMaterializer sebelum data ditulis ke sink. Ini mencegah potensi masalah kinerja yang disebabkan oleh operator, terutama masalah bahwa sejumlah besar data state dihasilkan selama operasi jangka panjang. Untuk informasi lebih lanjut tentang operator SinkUpsertMaterializer, lihat Catatan Penggunaan.
Kunci gabungan operasi JOIN pada tabel dimensi tidak konsisten dengan kunci utama. Kunci gabungan tabel dimensi dianggap sebagai kunci utama dan operator peringkat tersedia setelah operasi JOIN dilakukan. Dalam hal ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled memungkinkan sistem untuk mengacak data berdasarkan kunci utama CDC, yang dianggap sebagai kunci pembaruan. Anda juga dapat mengaktifkan kebijakan join SHUFFLE_HASH untuk mengoptimalkan skenario ini. Dibandingkan dengan metode ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled dapat mencegah UpdateFastRank menurun menjadi RetractRank. Untuk informasi lebih lanjut tentang cara mengubah RetractRank menjadi UpdateFastRank, lihat Praktik TopN.
Catatan Penggunaan
Jika tidak ada kunci pembaruan yang tersedia dalam aliran, seluruh baris data digunakan sebagai kunci.
Throughput berkurang saat kunci pembaruan yang sama sering diperbarui dalam waktu singkat. Hal ini karena data dengan kunci pembaruan yang sama diproses dalam urutan yang ketat.
Dibandingkan dengan operasi JOIN pada tabel dimensi dalam mode asinkron sebelum optimasi, mode Key-Ordered menyediakan Keyed State. Jika Anda mengaktifkan atau menonaktifkan mode Key-Ordered, kompatibilitas data state terpengaruh.
Optimasi berlaku hanya saat Anda menambahkan konfigurasi
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'dantable.exec.async-lookup.key-ordered-enabled='true'ke operasi JOIN pada tabel dimensi di VVR 8.0.10 atau lebih baru dan aliran masukan merupakan aliran pembaruan.
Nilai Valid
false(default): Menonaktifkan mode Key-Ordered.true: Mengaktifkan mode Key-Ordered.
Contoh
Salin dan tempel kode berikut ke draf streaming dan terapkan:
create TEMPORARY table bid_source( auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR, proc_time as proctime(), WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND ) with ( 'connector' = 'kafka', -- A non-insert-only stream connector. 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE users ( user_id STRING PRIMARY KEY NOT ENFORCED, -- Define the primary key. user_name VARCHAR(255) NOT NULL, age INT NOT NULL ) WITH ( 'connector' = 'hologres', -- The connector that supports the asynchronous lookup feature. 'async' = 'true', 'dbname' = 'holo db name', -- The name of the Hologres database. 'tablename' = 'schema_name.table_name', -- The name of the Hologres table that is used to receive data. 'username' = 'access id', -- The AccessKey ID of your Alibaba Cloud account. 'password' = 'access key', -- The AccessKey secret of your Alibaba Cloud account. 'endpoint' = 'holo vpc endpoint', -- The virtual private cloud (VPC) endpoint of your Hologres instance. ); CREATE TEMPORARY TABLE bh ( auction BIGINT, age int ) WITH ( 'connector' = 'blackhole' ); insert into bh SELECT bid_source.auction, u.age FROM bid_source JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u ON bid_source.channel = u.user_id;Pergi ke halaman detail pekerjaan. Di tab Configuration, temukan bagian Parameters, dan klik Edit. Di Other Configuration, tambahkan
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'dantable.exec.async-lookup.key-ordered-enabled='true'.Mulai pekerjaan.
Di tab Status, Anda dapat melihat KEY_ORDERED:true di atribut async pekerjaan.

table.optimizer.window-join-enabled
Parameter ini mengontrol apakah akan mengaktifkan window joins. Saat diaktifkan, Flink mengoptimalkan rencana eksekusi berdasarkan pendekatan window join. Untuk jendela kecil, ini dapat mengurangi ukuran state dan meningkatkan kinerja. Selain itu, tidak seperti regular joins, mengaktifkan window join dapat menghindari pengiriman pesan pembaruan ke operator hilir. Ini cocok untuk kasus penggunaan yang memerlukan join pada jendela kecil.
Window joins vs. regular joins
Window joins memiliki batasan sintaksis tambahan limitasi. Selain itu, mereka tidak mendukung aliran pembaruan.
Window joins biasanya memiliki latensi lebih tinggi. Latensi aktual tergantung pada ukuran jendela dan seberapa cepat watermark maju.
Setelah Anda mengaktifkan window joins berbasis waktu peristiwa, data terlambat dibuang. Regular joins tidak membuang data terlambat.
Setelah mengaktifkan atau menonaktifkan window joins, Anda tidak dapat memulihkan dari checkpoint yang ada. Hal ini karena struktur state dasar dari window joins dan regular joins tidak kompatibel.
Nilai Valid
false(nilai default): Pernyataan window join diubah menjadi regular joins untuk dieksekusi.true: Pernyataan yang sesuai diubah menjadi window joins untuk dieksekusi.
Contoh
Buat draf streaming SQL dan jalankan kode berikut. Kami langsung mengatur
table.optimizer.window-join-enabledmenjaditruemelalui pernyataanSET:SET 'table.optimizer.window-join-enabled' = 'true'; CREATE TEMPORARY TABLE LeftTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE RightTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS ) WITH ( 'connector'='datagen' ); EXPLAIN SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end FROM ( SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;Anda dapat melihat operator
WindowJoindalam rencana eksekusi yang dioptimalkan.== Optimized Execution Plan == Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[(num = num0)], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0]) :- Exchange(distribution=[hash[num]]) : +- Calc(select=[id, num, window_start, window_end]) : +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) : +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num]) +- Exchange(distribution=[hash[num]]) +- Calc(select=[id, num, window_start, window_end]) +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)]) +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])Ubah nilai
table.optimizer.window-join-enableddalam pernyataanSETmenjadifalse, dan jalankan kode:-- atur menjadi 'false' atau hapus klausa pengaturan ini SET 'table.optimizer.window-join-enabled' = 'false'; CREATE TEMPORARY TABLE LeftTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE RightTable ( id VARCHAR, row_time TIMESTAMP_LTZ(3), num INT, WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS ) WITH ( 'connector'='datagen' ); EXPLAIN SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end FROM ( SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;Operator
Joinreguler telah menggantikan operatorWindowJoin:== Optimized Execution Plan == Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end]) +- Join(joinType=[InnerJoin], where=[((num = num0) AND (window_start = window_start0) AND (window_end = window_end0))], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[num, window_start, window_end]]) : +- Calc(select=[id, num, window_start, window_end]) : +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) : +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) : +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num]) +- Exchange(distribution=[hash[num, window_start, window_end]]) +- Calc(select=[id, num, window_start, window_end]) +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])]) +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)]) +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])