All Products
Search
Document Center

Realtime Compute for Apache Flink:Parameter Utama

Last Updated:Aug 07, 2025

Topik ini menjelaskan beberapa parameter utama yang mungkin perlu dikonfigurasi saat mengembangkan draf SQL. Topik ini juga memberikan contoh cara mengonfigurasi parameter tersebut.

table.exec.sink.keyed-shuffle

Parameter table.exec.sink.keyed-shuffle menyelesaikan ketidakteraturan data yang ditulis ke tabel dengan kunci utama. Mengaktifkannya memungkinkan pekerjaan melakukan hash shuffling, memastikan data dengan kunci utama yang sama diarahkan ke tugas operator yang sama, sehingga mengurangi kemungkinan masalah ketidakteraturan.

Catatan Penggunaan

  • Hash shuffling hanya membantu jika operator hulu dapat memastikan urutan valid dari catatan pembaruan di bidang kunci utama.

  • Jika Anda mengubah paralelisme operator untuk pekerjaan yang berjalan dalam mode ahli, aturan paralelisme berikut tidak berlaku.

Nilai Valid

  • AUTO (default): Jika paralelisme operator sink bukan 1 dan berbeda dari paralelisme operator hulu, Realtime Compute for Apache Flink secara otomatis melakukan hash shuffling pada bidang kunci utama saat data mengalir ke operator sink.

  • FORCE: Jika paralelisme operator sink bukan 1, Realtime Compute for Apache Flink secara paksa melakukan hash shuffling pada bidang kunci utama saat data mengalir ke operator sink.

  • NONE: Realtime Compute for Apache Flink tidak melakukan hash shuffling berdasarkan paralelisme operator sink dan paralelisme operator hulu.

Contoh

  • AUTO

    1. Salin dan tempel kode ke draf streaming SQL, lalu terapkan draf tersebut. Secara eksplisit atur paralelisme sink menjadi 2:

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print',
         -- Atur paralelisme operator sink menjadi 2.
        'sink.parallelism'='2'
      );
      
      INSERT INTO sink SELECT * FROM s1;
      -- Anda juga dapat mengonfigurasi opsi tabel dinamis untuk menentukan paralelisme operator sink.
      --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
    2. Aktifkan hash shuffling otomatis.

      1. Buka halaman detail pekerjaan.

      2. Di tab Configuration, temukan bagian Resources, lalu klik Edit.

      3. Atur Parallelism menjadi 1.

        image

      4. Di bagian Parameters, klik Edit.

      5. Di bidang Other Configuration, secara eksplisit atur table.exec.sink.keyed-shuffle: AUTO. Sebagai alternatif, biarkan bidang kosong.

    3. Mulai pekerjaan.

      Di tab Status, mode koneksi data antara operator sink dan operator hulu adalah HASH.

      image

  • FORCE

    1. Salin dan tempel kode ke draf streaming SQL, lalu terapkan draf tersebut. Jangan secara eksplisit mengatur paralelisme sink.

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print'
      );
      
      INSERT INTO sink
      SELECT * FROM s1;
    2. Aktifkan hash shuffling FORCE.

      1. Buka halaman detail pekerjaan.

      2. Di tab Configuration, temukan bagian Resources, klik Edit, lalu atur Parallelism menjadi 2.

      3. Di bagian Parameters, klik Edit.

      4. Di bidang Other Configuration, tambahkan table.exec.sink.keyed-shuffle: FORCE.

        image

    3. Mulai pekerjaan.

      Di tab Status, baik paralelisme operator sink maupun operator hulu adalah 2, dan mode koneksi data adalah HASH.

      image

table.exec.mini-batch.size

Parameter ini menentukan jumlah maksimum catatan input yang dapat disimpan di operator komputasi untuk operasi mikro-batch. Jika jumlah catatan data yang disimpan mencapai ukuran MiniBatch, perhitungan dan keluaran data dipicu. Parameter ini hanya berlaku jika digunakan bersama dengan parameter table.exec.mini-batch.enabled dan table.exec.mini-batch.allow-latency. Untuk informasi lebih lanjut tentang optimasi terkait miniBatch, lihat MiniBatch Aggregation dan MiniBatch Regular Joins.

Catatan Penggunaan

Jika Anda tidak mengonfigurasi parameter ini secara eksplisit di bagian Parameters sebelum pekerjaan dimulai, memori terkelola digunakan untuk menyimpan data dalam mode pemrosesan miniBatch. Jika salah satu dari kondisi berikut terpenuhi, perhitungan akhir dan keluaran data dipicu:

  • Operator komputasi menerima pesan watermark yang dikirim oleh operator MiniBatchAssigner.

  • Memori terkelola penuh.

  • Sebelum checkpointing.

  • Pekerjaan dibatalkan.

Nilai Valid

  • -1 (default): Memori terkelola digunakan untuk menyimpan data.

  • Nilai negatif dari tipe LONG: Mekanisme pemrosesan sama dengan mekanisme pemrosesan untuk nilai default.

  • Nilai positif dari tipe LONG: Memori heap digunakan untuk menyimpan data. Saat jumlah catatan input yang disimpan mencapai nilai yang ditentukan, keluaran data dipicu.

Contoh

  1. Buat draf streaming SQL, salin pernyataan SQL uji berikut, lalu terapkan draf tersebut.

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts TIMESTAMP(3),
      PRIMARY KEY (a) NOT ENFORCED,
      WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.ts.kind'='random',
      'fields.ts.max-past'='5s',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a INT,
      b BIGINT,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='print'
    );
    
    INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
  2. Konfigurasikan ukuran MiniBatch.

    1. Pergi ke halaman detail pekerjaan.

    2. Di tab Configuration, temukan bagian Parameters, dan klik Edit.

    3. Di Other Configuration, atur table.exec.mini-batch.enabled: true dan table.exec.mini-batch.allow-latency: 2s.

    4. Atur table.exec.mini-batch.size menjadi -1 atau lewati parameter ini.

  3. Mulai pekerjaan.

    Di tab Status, topologi pekerjaan berisi operator MiniBatchAssigner, LocalGroupAggregate, dan GlobalGroupAggregate.

    image

table.exec.agg.mini-batch.output-identical-enabled

Parameter ini menentukan apakah data duplikat akan dikirim ke operator hilir jika state TTL diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operator MinibatchGlobalAgg dan MinibatchAgg. Secara default, data duplikat tidak dikirim. Namun, perilaku ini dapat menyebabkan state operator hilir kedaluwarsa karena tidak ada data yang diterima selama periode waktu tertentu. Anda dapat mengatur parameter ini ke true untuk operator MinibatchGlobalAgg dan MinibatchAgg. Jika periode perubahan hasil agregasi pekerjaan Anda lebih kecil dari TTL state yang ditentukan, lewati pengaturan ini. Untuk informasi lebih lanjut, lihat FLINK-33936.

Catatan Penggunaan

  • Saklar ini hanya berlaku di Ververica Runtime (VVR) 8.0.8 atau versi lebih baru.

  • Jika Anda mengubah nilai dari false menjadi true, jumlah data yang dikirim oleh operator MinibatchGlobalAgg dan MinibatchAgg mungkin meningkat. Hal ini dapat memberikan tekanan tambahan pada operator hilir.

Nilai Valid

  • false (default): Jika TTL data state diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operator MinibatchGlobalAgg dan MinibatchAgg, data duplikat tidak dikirim ke operator hilir.

  • true: Jika TTL data state diaktifkan dan hasil agregasi tetap tidak berubah setelah data dikonsumsi oleh operator MinibatchGlobalAgg dan MinibatchAgg, data duplikat dikirim ke operator hilir.

Contoh

  1. Buat draf streaming SQL, salin pernyataan SQL uji berikut, lalu terapkan draf tersebut.

    create temporary table src(
        a int,
        b string
    ) with (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.a.min' = '1',
        'fields.a.max' = '1',
        'fields.b.length' = '3'
    );
    
    create temporary table snk(
        a int,
        max_length_b bigint
    ) with (
        'connector' = 'blackhole'
    );
    
    insert into snk select a, max(CHAR_LENGTH(b)) from src group by a; 
  2. Aktifkan optimasi agregat MiniBatch.

    1. Pergi ke halaman detail pekerjaan.

    2. Di tab Configuration, temukan bagian Parameters, dan klik Edit.

    3. Di Other Configuration, tambahkan table.exec.mini-batch.enabled: true dan table.exec.mini-batch.allow-latency: 2s.

  3. Mulai pekerjaan.

    Di tab Status, Anda dapat melihat operator MinibatchGlobalAggregate. Klik + pada operator untuk memverifikasi bahwa operator GlobalGroupAggregate belum mengirimkan data ke operator hilir jika hasil agregasi tetap tidak berubah.

    image

  4. Hentikan pekerjaan. Di tab Configuration, temukan bagian Parameters, dan klik Edit. Di Other Configuration, tambahkan table.exec.agg.mini-batch.output-identical-enabled: true.

  5. Mulai pekerjaan.

    Di tab Status, Anda dapat melihat operator MinibatchGlobalAggregate. Klik + pada operator untuk memeriksa bahwa operator GlobalGroupAggregate mengirimkan data ke operator hilir jika hasil agregasi tetap tidak berubah.image

table.exec.async-lookup.key-ordered-enabled

Dalam kasus penggunaan yang melibatkan lookup joins, Anda dapat mengaktifkan mode asinkron untuk meningkatkan throughput. Tabel berikut menggambarkan urutan data operasi I/O asinkron berdasarkan pengaturan parameter table.exec.async-lookup.output-mode dalam lookup joins dan apakah aliran masukan merupakan aliran pembaruan.

table.exec.async-lookup.output-mode

Aliran Pembaruan

Aliran Non-Pembaruan

ORDERED

Mode Terurut

Mode Terurut

ALLOW_UNORDERED

Mode Terurut

Mode Tidak Terurut

Jika table.exec.async-lookup.output-mode diatur ke ALLOW_UNORDERED untuk aliran pembaruan, kebenaran data dijamin melalui mode terurut tetapi throughput berkurang. Untuk menyelesaikan masalah ini, parameter table.exec.async-lookup.key-ordered-enabled diperkenalkan untuk memastikan kebenaran data aliran pembaruan dan throughput operasi I/O asinkron. Pesan dengan kunci pembaruan yang sama (dianggap sebagai kunci utama log perubahan) dalam aliran diproses berdasarkan urutan pesan masuk ke operator.

Catatan
  • Mode Terurut: Dalam mode ini, urutan aliran tetap tidak berubah. Urutan pengiriman pesan hasil sama dengan urutan permintaan asinkron dipicu (urutan pesan masuk ke operator).

  • Mode Tidak Terurut: Dalam mode ini, pesan hasil dikirim segera setelah permintaan asinkron selesai. Urutan pesan dalam aliran berubah setelah pesan diproses oleh operator I/O asinkron. Untuk informasi lebih lanjut, lihat Async I/O | Apache Flink.

Skenario

  • Jumlah pesan dengan kunci pembaruan yang sama dalam aliran kecil selama periode waktu tertentu. Misalnya, kunci pembaruan dianggap sebagai kunci utama dan data dengan kunci utama yang sama tidak sering diperbarui. Selain itu, urutan pemrosesan berdasarkan kunci pembaruan diperlukan saat tabel dimensi digabungkan. Dalam hal ini, Anda dapat melakukan optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled. Ini dapat memastikan urutan pemrosesan data berdasarkan kunci pembaruan.

  • Dalam aliran Change Data Capture (CDC) yang berisi kunci utama, tabel dimensi digabungkan untuk menghasilkan tabel lebar untuk penulisan data ke sink. Kunci utama sink konsisten dengan kunci utama sumber. Selain itu, kunci gabungan operasi JOIN pada tabel dimensi tidak konsisten dengan kunci utama. Kunci gabungan dianggap sebagai kunci utama. Dalam hal ini, Anda dapat melakukan optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled. Ini memungkinkan sistem untuk mengacak data berdasarkan kunci utama CDC, yang dianggap sebagai kunci pembaruan. Anda juga dapat mengaktifkan kebijakan join SHUFFLE_HASH untuk mengoptimalkan skenario ini. Dalam skenario konkurensi tinggi, dibandingkan dengan metode ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled dapat mencegah pembuatan operator SinkMaterializer sebelum data ditulis ke sink. Ini mencegah potensi masalah kinerja yang disebabkan oleh operator, terutama masalah bahwa sejumlah besar data state dihasilkan selama operasi jangka panjang. Untuk informasi lebih lanjut tentang operator SinkUpsertMaterializer, lihat Catatan Penggunaan.

  • Kunci gabungan operasi JOIN pada tabel dimensi tidak konsisten dengan kunci utama. Kunci gabungan tabel dimensi dianggap sebagai kunci utama dan operator peringkat tersedia setelah operasi JOIN dilakukan. Dalam hal ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled memungkinkan sistem untuk mengacak data berdasarkan kunci utama CDC, yang dianggap sebagai kunci pembaruan. Anda juga dapat mengaktifkan kebijakan join SHUFFLE_HASH untuk mengoptimalkan skenario ini. Dibandingkan dengan metode ini, optimasi dengan menentukan parameter table.exec.async-lookup.key-ordered-enabled dapat mencegah UpdateFastRank menurun menjadi RetractRank. Untuk informasi lebih lanjut tentang cara mengubah RetractRank menjadi UpdateFastRank, lihat Praktik TopN.

Catatan Penggunaan

  • Jika tidak ada kunci pembaruan yang tersedia dalam aliran, seluruh baris data digunakan sebagai kunci.

  • Throughput berkurang saat kunci pembaruan yang sama sering diperbarui dalam waktu singkat. Hal ini karena data dengan kunci pembaruan yang sama diproses dalam urutan yang ketat.

  • Dibandingkan dengan operasi JOIN pada tabel dimensi dalam mode asinkron sebelum optimasi, mode Key-Ordered menyediakan Keyed State. Jika Anda mengaktifkan atau menonaktifkan mode Key-Ordered, kompatibilitas data state terpengaruh.

  • Optimasi berlaku hanya saat Anda menambahkan konfigurasi table.exec.async-lookup.output-mode='ALLOW_UNORDERED' dan table.exec.async-lookup.key-ordered-enabled='true' ke operasi JOIN pada tabel dimensi di VVR 8.0.10 atau lebih baru dan aliran masukan merupakan aliran pembaruan.

Nilai Valid

  • false (default): Menonaktifkan mode Key-Ordered.

  • true: Mengaktifkan mode Key-Ordered.

Contoh

  1. Salin dan tempel kode berikut ke draf streaming dan terapkan:

    create TEMPORARY table bid_source(
      auction  BIGINT,
      bidder  BIGINT,
      price  BIGINT,
      channel  VARCHAR,
      url  VARCHAR,
      dateTime  TIMESTAMP(3),
      extra  VARCHAR,
      proc_time as proctime(),
      WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
    ) with (
      'connector' = 'kafka', -- A non-insert-only stream connector.
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE users (
        user_id STRING PRIMARY KEY NOT ENFORCED, -- Define the primary key.
        user_name VARCHAR(255) NOT NULL, 
        age INT NOT NULL
    ) WITH (
        'connector' = 'hologres', -- The connector that supports the asynchronous lookup feature.
        'async' = 'true',
        'dbname' = 'holo db name', -- The name of the Hologres database.
        'tablename' = 'schema_name.table_name', -- The name of the Hologres table that is used to receive data.
        'username' = 'access id', -- The AccessKey ID of your Alibaba Cloud account.
        'password' = 'access key', -- The AccessKey secret of your Alibaba Cloud account.
        'endpoint' = 'holo vpc endpoint', -- The virtual private cloud (VPC) endpoint of your Hologres instance.
    );
    
    CREATE TEMPORARY TABLE bh ( 
        auction  BIGINT,
        age int
    ) WITH (
        'connector' = 'blackhole'
    );
    
    insert into bh
    SELECT
        bid_source.auction,
        u.age
    FROM bid_source
        JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u
        ON bid_source.channel = u.user_id;
    
  2. Pergi ke halaman detail pekerjaan. Di tab Configuration, temukan bagian Parameters, dan klik Edit. Di Other Configuration, tambahkan table.exec.async-lookup.output-mode='ALLOW_UNORDERED' dan table.exec.async-lookup.key-ordered-enabled='true'.

  3. Mulai pekerjaan.

    Di tab Status, Anda dapat melihat KEY_ORDERED:true di atribut async pekerjaan.

    image

table.optimizer.window-join-enabled

Parameter ini mengontrol apakah akan mengaktifkan window joins. Saat diaktifkan, Flink mengoptimalkan rencana eksekusi berdasarkan pendekatan window join. Untuk jendela kecil, ini dapat mengurangi ukuran state dan meningkatkan kinerja. Selain itu, tidak seperti regular joins, mengaktifkan window join dapat menghindari pengiriman pesan pembaruan ke operator hilir. Ini cocok untuk kasus penggunaan yang memerlukan join pada jendela kecil.

Window joins vs. regular joins

  • Window joins memiliki batasan sintaksis tambahan limitasi. Selain itu, mereka tidak mendukung aliran pembaruan.

  • Window joins biasanya memiliki latensi lebih tinggi. Latensi aktual tergantung pada ukuran jendela dan seberapa cepat watermark maju.

  • Setelah Anda mengaktifkan window joins berbasis waktu peristiwa, data terlambat dibuang. Regular joins tidak membuang data terlambat.

  • Setelah mengaktifkan atau menonaktifkan window joins, Anda tidak dapat memulihkan dari checkpoint yang ada. Hal ini karena struktur state dasar dari window joins dan regular joins tidak kompatibel.

Nilai Valid

  • false (nilai default): Pernyataan window join diubah menjadi regular joins untuk dieksekusi.

  • true: Pernyataan yang sesuai diubah menjadi window joins untuk dieksekusi.

Contoh

  1. Buat draf streaming SQL dan jalankan kode berikut. Kami langsung mengatur table.optimizer.window-join-enabled menjadi true melalui pernyataan SET:

    SET 'table.optimizer.window-join-enabled' = 'true';
    
    CREATE TEMPORARY TABLE LeftTable (
      id VARCHAR,
      row_time TIMESTAMP_LTZ(3),
      num INT,
      WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE RightTable (
      id VARCHAR,
      row_time TIMESTAMP_LTZ(3),
      num INT,
      WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS
    ) WITH (
      'connector'='datagen'
    );
    
    EXPLAIN
    SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
               COALESCE(L.window_start, R.window_start) as window_start,
               COALESCE(L.window_end, R.window_end) as window_end
               FROM (
                   SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
               ) L
               JOIN (
                   SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
               ) R
               ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;

    Anda dapat melihat operator WindowJoin dalam rencana eksekusi yang dioptimalkan.

    == Optimized Execution Plan ==
    Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end])
    +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], joinType=[InnerJoin], where=[(num = num0)], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0])
       :- Exchange(distribution=[hash[num]])
       :  +- Calc(select=[id, num, window_start, window_end])
       :     +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])])
       :        +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
       :           +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num])
       +- Exchange(distribution=[hash[num]])
          +- Calc(select=[id, num, window_start, window_end])
             +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])])
                +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)])
                   +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])
  2. Ubah nilai table.optimizer.window-join-enabled dalam pernyataan SET menjadi false, dan jalankan kode:

    -- atur menjadi 'false' atau hapus klausa pengaturan ini
    SET 'table.optimizer.window-join-enabled' = 'false';
    
    CREATE TEMPORARY TABLE LeftTable (
      id VARCHAR,
      row_time TIMESTAMP_LTZ(3),
      num INT,
      WATERMARK FOR row_time as row_time - INTERVAL '5' SECONDS
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE RightTable (
      id VARCHAR,
      row_time TIMESTAMP_LTZ(3),
      num INT,
      WATERMARK FOR row_time as row_time - INTERVAL '10' SECONDS
    ) WITH (
      'connector'='datagen'
    );
    
    EXPLAIN
    SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
               COALESCE(L.window_start, R.window_start) as window_start,
               COALESCE(L.window_end, R.window_end) as window_end
               FROM (
                   SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
               ) L
               JOIN (
                   SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
               ) R
               ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;

    Operator Join reguler telah menggantikan operator WindowJoin:

    == Optimized Execution Plan ==
    Calc(select=[num AS L_Num, id AS L_Id, num0 AS R_Num, id0 AS R_Id, CASE(window_start IS NOT NULL, window_start, window_start0) AS window_start, CASE(window_end IS NOT NULL, window_end, window_end0) AS window_end])
    +- Join(joinType=[InnerJoin], where=[((num = num0) AND (window_start = window_start0) AND (window_end = window_end0))], select=[id, num, window_start, window_end, id0, num0, window_start0, window_end0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[num, window_start, window_end]])
       :  +- Calc(select=[id, num, window_start, window_end])
       :     +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])])
       :        +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
       :           +- TableSourceScan(table=[[vvp, default, LeftTable]], fields=[id, row_time, num])
       +- Exchange(distribution=[hash[num, window_start, window_end]])
          +- Calc(select=[id, num, window_start, window_end])
             +- WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[5 min])])
                +- WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 10000:INTERVAL SECOND)])
                   +- TableSourceScan(table=[[vvp, default, RightTable]], fields=[id, row_time, num])

Referensi

Mengapa keluaran data pada operator LocalGroupAggregate tertunda untuk waktu yang lama dan tidak menghasilkan keluaran data?