全部产品
Search
文档中心

Realtime Compute for Apache Flink:Menangani peristiwa changelog yang tidak berurutan

更新时间:Oct 30, 2025

Jika Anda menggunakan Flink SQL untuk pemrosesan data real-time, peristiwa changelog yang tidak berurutan dapat memengaruhi akurasi data. Topik ini menjelaskan cara kerja mekanisme changelog dalam Flink SQL, penyebab peristiwa changelog yang tidak berurutan, dan bagaimana menanganinya.

Changelog dalam Flink SQL

Informasi latar belakang

Log biner (binlog) dalam database relasional seperti MySQL mencatat semua operasi modifikasi dalam database, termasuk operasi INSERT, UPDATE, dan DELETE. Demikian pula, changelog dalam Flink SQL mencatat semua perubahan data untuk memfasilitasi pemrosesan data inkremental.

Dalam MySQL, Anda dapat menggunakan binlog untuk cadangan data, pemulihan, sinkronisasi, dan replikasi. Misalnya, Anda dapat membaca dan mengurai catatan operasi dalam binlog untuk melakukan sinkronisasi dan replikasi data inkremental. Change Data Capture (CDC) adalah teknologi umum yang digunakan untuk sinkronisasi data. Alat CDC dapat memantau perubahan data dalam database dan mengonversi perubahan tersebut menjadi aliran peristiwa untuk memfasilitasi pemrosesan real-time. Anda dapat menggunakan alat CDC untuk mengirim perubahan data dari database relasional ke gudang data atau sistem lain untuk analisis dan pelaporan real-time. Beberapa alat CDC populer meliputi Debezium dan Maxwell. Apache Flink menambahkan dukungan CDC untuk mengatasi FLINK-15331, memungkinkan integrasi data CDC dari sistem eksternal untuk menerapkan sinkronisasi dan analisis data real-time.

Pembuatan dan pemrosesan peristiwa changelog

Seperti disebutkan dalam bagian "Informasi Latar Belakang" dari topik ini, peristiwa changelog dapat dihasilkan oleh sumber eksternal seperti file binlog dan alat CDC. Peristiwa changelog juga dapat dihasilkan oleh operasi internal Flink SQL. Aliran changelog yang hanya berisi peristiwa INSERT disebut append stream atau non-update stream. Aliran changelog yang berisi jenis peristiwa lain, seperti peristiwa UPDATE, disebut update stream. Beberapa operasi dalam Flink, seperti agregasi grup dan deduplikasi, dapat menghasilkan peristiwa UPDATE. Dalam kebanyakan kasus, operator yang menghasilkan peristiwa UPDATE mempertahankan status dan disebut sebagai operator stateful. Harap diperhatikan bahwa tidak semua operator stateful mendukung aliran update sebagai input. Misalnya, operator over aggregation dan interval join tidak mendukung aliran update.

Untuk informasi tentang operasi kueri yang didukung dalam Realtime Compute for Apache Flink menggunakan Ververica Runtime (VVR) 6.0 atau lebih baru, operator runtime yang sesuai, serta dukungan untuk mengonsumsi dan menghasilkan aliran update, lihat Eksekusi Kueri.

Jenis peristiwa changelog

Apache Flink memperkenalkan mekanisme retraction sebagaimana diusulkan dalam FLINK-6047 untuk mengimplementasikan algoritma pembaruan inkremental untuk operator SQL streaming. Dalam mekanisme ini, peristiwa dikategorikan sebagai INSERT atau DELETE. Pengkategorian ini berlaku untuk sumber data yang hanya mendukung peristiwa INSERT. Kemudian, Apache Flink merefaktor sistem peristiwa changelog untuk mengatasi FLINK-16987. Sistem yang direfaktor menggunakan jenis peristiwa changelog berikut untuk memfasilitasi integrasi dengan ekosistem CDC:

/**
 * Jenis baris dalam sebuah Changelog.
 */
@PublicEvolving
public enum RowKind {

	/**
	 * Operasi penyisipan.
	 */
	INSERT,

	/**
	 * Konten sebelumnya dari baris yang diperbarui.
	 */
	UPDATE_BEFORE,

	/**
	 * Konten baru dari baris yang diperbarui.
	 */
	UPDATE_AFTER,

	/**
	 * Operasi penghapusan.
	 */
	DELETE
}

Seperti ditunjukkan dalam kode sebelumnya, UPDATE_BEFORE dan UPDATE_AFTER merupakan jenis peristiwa terpisah. Apache Flink tidak menggabungkan jenis peristiwa ini menjadi tipe peristiwa UPDATE komposit karena alasan berikut:

  • Peristiwa jenis UPDATE_BEFORE dan UPDATE_AFTER memiliki struktur yang sama. Satu-satunya perbedaan adalah properti RowKind, yang membuat serialisasi lebih mudah. Sebaliknya, jika tipe peristiwa UPDATE komposit digunakan, peristiwa tersebut menjadi heterogen atau menyamakan peristiwa INSERT dan DELETE dengan peristiwa UPDATE. Misalnya, peristiwa INSERT hanya berisi peristiwa UPDATE_AFTER, sedangkan peristiwa DELETE hanya berisi peristiwa UPDATE_BEFORE.

  • Operasi pengacakan data, seperti join dan agregasi, sering terjadi dalam lingkungan terdistribusi. Ini memerlukan peristiwa UPDATE komposit dibagi menjadi peristiwa DELETE dan INSERT terpisah dalam skenario tertentu, seperti ditunjukkan dalam contoh berikut.

Contoh

Dalam skenario sampel ini, peristiwa UPDATE komposit dibagi menjadi peristiwa DELETE dan INSERT. Kode SQL berikut juga digunakan untuk mendemonstrasikan masalah peristiwa changelog yang tidak berurutan dan solusi yang sesuai, yang dijelaskan dalam bagian selanjutnya dari topik ini.

-- Tabel sumber CDC: s1 & s2
CREATE TEMPORARY TABLE s1 (
  id BIGINT, 
  level BIGINT,
  PRIMARY KEY(id) NOT ENFORCED
)WITH (...);

CREATE TEMPORARY TABLE s2 (
  id BIGINT, 
  attr VARCHAR, 
  PRIMARY KEY(id) NOT ENFORCED
)WITH (...);

-- tabel sink: t1
CREATE TEMPORARY TABLE t1 (
  id BIGINT, 
  level BIGINT, 
  attr VARCHAR,
  PRIMARY KEY(id) NOT ENFORCED
)WITH (...);

-- gabungkan s1 dan s2 dan masukkan hasilnya ke t1 
INSERT INTO t1
SELECT 
  s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;

Dalam aliran changelog tabel s1, jika rekaman (id=1, level=10) dimasukkan pada waktu t0 dan diperbarui menjadi (id=1, level=20) pada waktu t1, tiga peristiwa terpisah dihasilkan. Tabel berikut menggambarkan peristiwa-peristiwa ini.

s1

Jenis peristiwa

+I (id=1, level=10)

INSERT

-U (id=1, level=10)

UPDATE_BEFORE

+U (id=1, level=20)

UPDATE_AFTER

Kunci utama tabel s1 adalah id. Namun, operasi join memerlukan data diacak berdasarkan kolom level, seperti yang ditentukan dalam klausa ON.

image.png

Jika operator Join memiliki paralelisme 2, tiga peristiwa sebelumnya mungkin dikirim ke dua tugas. Dalam hal ini, bahkan jika tipe peristiwa UPDATE komposit digunakan, peristiwa tersebut masih perlu dibagi selama pengacakan untuk memastikan pemrosesan paralel.

image.png

Peristiwa changelog yang tidak berurutan

Penyebab

Bagian ini menggunakan contoh sebelumnya dan menambahkan kondisi berikut: Operator Join menerima +I (id=10, attr='a1') dan +I (id=20, attr='b1') peristiwa dari tabel s2, kemudian menerima tiga peristiwa changelog dari tabel s1. Dalam lingkungan terdistribusi, urutan kedatangan peristiwa changelog sebelumnya ke operator Sink hilir dapat bervariasi karena operasi join dilakukan secara paralel oleh dua tugas. Tabel berikut menjelaskan kemungkinan urutan peristiwa.

image

Kasus 1

Kasus 2

Kasus 3

+I (id=1, level=10, attr='a1')

-U (id=1, level=10, attr='a1')

+U (id=1, level=20, attr='b1')

+U (id=1, level=20, attr='b1')

+I (id=1, level=10, attr='a1')

-U (id=1, level=10, attr='a1')

+I (id=1, level=10, attr='a1')

+U (id=1, level=20, attr='b1')

-U (id=1, level=10, attr='a1')

Urutan peristiwa dalam Kasus 1 sama dengan pemrosesan berurutan. Dalam Kasus 2 dan Kasus 3, peristiwa changelog tiba di luar urutan di operator hilir, yang dapat menyebabkan hasil yang salah dalam Flink SQL. Dalam contoh ini, kunci utama tabel sink adalah id. Jika penyimpanan eksternal melakukan operasi upsert dalam Kasus 2 dan Kasus 3, rekaman dengan id 1 secara salah dihapus dari penyimpanan eksternal. Namun, hasil yang diharapkan adalah bahwa rekaman (id=1, level=20, attr='b1') ada.

Gunakan SinkUpsertMaterializer sebagai solusi

Dalam contoh ini, operator Join menghasilkan aliran update karena output berisi peristiwa INSERT (+I) dan peristiwa UPDATE (-U dan +U). Jika peristiwa changelog yang tidak berurutan tidak ditangani dengan benar, hasil akhirnya mungkin salah.

Kunci unik dan kunci upsert

Kunci unik merujuk pada satu atau lebih kolom yang memenuhi batasan UNIQUE setelah operasi SQL. Dalam contoh ini, (s1.id), (s1.id, s1.level), dan (s1.id, s2.id) semuanya adalah kunci unik.

Mekanisme changelog dalam Flink SQL mirip dengan mekanisme binlog tetapi memiliki implementasi yang disederhanakan. Alih-alih mencatat timestamp setiap pembaruan, Flink menentukan urutan peristiwa pembaruan untuk kunci utama berdasarkan analisis global dalam SQL planner. Kolom yang mempertahankan pengurutan kunci unik disebut sebagai kunci upsert. Jika kunci upsert ada, operator hilir dapat menerima peristiwa pembaruan dalam urutan yang benar. Jika operasi shuffle mengganggu pengurutan kunci unik, kunci upsert menjadi kosong. Dalam hal ini, operator hilir harus menggunakan algoritma, seperti algoritma penghitungan, untuk memastikan konsistensi akhir.

Dalam contoh ini, baris dalam tabel s1 diacak berdasarkan kolom level. Akibatnya, output operator Join berisi baris yang memiliki nilai s1.id yang sama tanpa pengurutan pada kunci unik. Ini berarti bahwa kunci upsert kosong. Dalam hal ini, Flink harus menyimpan semua rekaman input dan membandingkan semua kolom untuk membedakan antara operasi UPDATE dan INSERT.

Selain itu, kunci utama tabel sink adalah kolom id, yang tidak cocok dengan kunci upsert dalam output operator Join. Oleh karena itu, baris output operator Join harus dikonversi dengan benar ke baris yang diperlukan dalam tabel sink.

SinkUpsertMaterializer

Seperti disebutkan dalam bagian "Kunci Unik dan Kunci Upsert" dari topik ini, jika operator Join menghasilkan aliran update yang kunci upsertnya tidak cocok dengan kunci utama tabel sink, langkah perantara diperlukan untuk menghasilkan peristiwa changelog berdasarkan kunci utama tabel sink. Oleh karena itu, Flink memperkenalkan operator SinkUpsertMaterializer untuk menyelesaikan FLINK-20374 guna menghubungkan operator Join dan operator hilirnya.

Peristiwa changelog yang tidak berurutan, seperti peristiwa yang dijelaskan dalam bagian "Penyebab" dari topik ini, mengikuti aturan tertentu. Misalnya, untuk kunci upsert tertentu atau untuk semua kolom jika kunci upsert kosong, peristiwa ADD (+I dan +U) terjadi sebelum peristiwa RETRACT yang sesuai (-D dan -U). Sepasang peristiwa changelog yang memiliki kunci upsert yang sama diproses oleh tugas yang sama meskipun terjadi pengacakan data. Ini juga menjelaskan mengapa urutan peristiwa changelog dalam contoh ini hanya memiliki tiga kemungkinan kasus, seperti yang dijelaskan dalam bagian "Penyebab" dari topik ini.

Operator SinkUpsertMaterializer diimplementasikan berdasarkan aturan sebelumnya. Gambar berikut menunjukkan cara kerja operator. Operator SinkUpsertMaterializer mempertahankan daftar nilai RowData dalam statusnya. Ketika baris dimasukkan, operator memeriksa apakah baris ada dalam daftar RowData berdasarkan kunci upsert yang disimpulkan atau seluruh baris jika kunci upsert kosong. Kemudian, operator menambahkan atau memperbarui baris dalam statusnya dalam kasus peristiwa ADD, atau menghapus baris dari statusnya dalam kasus peristiwa RETRACT. Terakhir, operator menghasilkan peristiwa changelog berdasarkan kunci utama tabel sink. Untuk informasi lebih lanjut, lihat kode sumber SinkUpsertMaterializer.

image.png

Gambar berikut menunjukkan bagaimana operator SinkUpsertMaterializer mentransformasi peristiwa changelog keluaran dari operator Join menjadi peristiwa changelog masukan tabel sink dalam contoh ini. Dalam Kasus 2, operator SinkUpsertMaterializer menghapus baris terakhir dari statusnya dan menghasilkan peristiwa UPDATE untuk baris kedua terakhir ketika peristiwa -U (id=1, level=10, attr='a1') tiba. Dalam Kasus 3, operator SinkUpsertMaterializer meneruskan peristiwa +U (id=1, level=20, attr='b1') ke hilir ketika peristiwa tersebut tiba. Kemudian, operator menghapus baris yang sesuai dari statusnya tanpa menghasilkan peristiwa ketika peristiwa -U (id=1, level=10, attr='a1') tiba. Dengan cara ini, operator SinkUpsertMaterializer memastikan bahwa hasil akhirnya sesuai harapan dalam Kasus 2 dan Kasus 3, yaitu (id=1, level=20, attr='b1').

image.png

Kasus penggunaan umum

Operator SinkUpsertMaterializer digunakan dalam skenario berikut:

  • Tabel sink memiliki kunci utama, tetapi data yang ditulis ke tabel tidak memenuhi batasan UNIQUE. Penyebab potensial meliputi, namun tidak terbatas pada, operasi berikut:

    • Menetapkan kunci utama untuk tabel sink ketika tabel sumber tidak memiliki kunci utama.

    • Tidak menyertakan kolom kunci utama tabel sumber saat memasukkan data ke tabel sink, atau menggunakan kolom non-kunci utama tabel sumber sebagai kunci utama tabel sink.

    • Tipe data kolom kunci utama tabel sumber menjadi kurang presisi setelah konversi atau agregasi grup. Contohnya, kolom diubah dari tipe BIGINT ke tipe INT.

    • Melakukan transformasi data pada kolom kunci utama tabel sumber atau kunci unik yang dihasilkan oleh agregasi grup. Contohnya, menggabungkan beberapa kunci utama menjadi satu kolom.

      CREATE TABLE students (
        student_id BIGINT NOT NULL,
        student_name STRING NOT NULL,
        course_id BIGINT NOT NULL,
        score DOUBLE NOT NULL,
        PRIMARY KEY(student_id) NOT ENFORCED
      ) WITH (...);
      
      CREATE TABLE performance_report (
        student_info STRING NOT NULL PRIMARY KEY NOT ENFORCED,
        avg_score DOUBLE NOT NULL
      ) WITH (...);
      
      CREATE TEMPORARY VIEW v AS
      SELECT student_id, student_name, AVG(score) AS avg_score
      FROM students
      GROUP BY student_id, student_name;
      
      -- Hasil penggabungan tidak lagi memenuhi batasan UNIQUE tetapi digunakan sebagai kunci utama tabel sink.
      INSERT INTO performance_report
      SELECT 
        CONCAT('id:', student_id, ',name:', student_name) AS student_info,
        avg_score
      FROM v;
  • Pengurutan asli data masukan terganggu sebelum ditulis ke tabel sink, seperti yang diilustrasikan dalam contoh pada topik ini. Operasi join yang dilakukan pada tabel s1 dan s2 tidak didasarkan pada kunci utama s1, tetapi tabel sink memiliki kunci utama yang sama dengan tabel s1. Hal ini mengakibatkan gangguan dalam pengurutan data.

  • Parameter table.exec.sink.upsert-materialize diatur ke 'force'. Untuk informasi lebih lanjut, lihat bagian "Konfigurasi parameter" pada topik ini.

Catatan penggunaan

Operator SinkUpsertMaterializer menyimpan daftar nilai RowData dalam statusnya. Hal ini dapat meningkatkan ukuran status dan menambah overhead I/O untuk mengakses data status, yang memengaruhi throughput. Kami menyarankan untuk menghindari penggunaan operator SinkUpsertMaterializer.

Konfigurasi parameter

Gunakan parameter table.exec.sink.upsert-materialize untuk mengonfigurasi operator SinkUpsertMaterializer. Nilai valid:

  • auto (default): Flink akan menyimpulkan apakah peristiwa changelog tidak berurutan dan menambahkan operator SinkUpsertMaterializer jika diperlukan.

  • none: Jangan gunakan operator SinkUpsertMaterializer.

  • force: Selalu gunakan operator SinkUpsertMaterializer. Dalam hal ini, operator ditambahkan meskipun tidak ada kunci utama yang ditentukan untuk tabel sink. Ini memastikan materialisasi data.

Perlu diperhatikan bahwa jika Anda mengatur parameter ke auto, penambahan operator SinkUpsertMaterializer tidak selalu berarti bahwa peristiwa tidak berurutan. Sebagai contoh, jika Anda menggunakan klausa GROUPING SETS bersama dengan fungsi COALESCE untuk mengonversi nilai null, SQL planner mungkin gagal menentukan apakah kunci upsert yang dihasilkan cocok dengan kunci utama tabel sink. Dalam hal ini, Flink menambahkan operator SinkUpsertMaterializer untuk memastikan kebenaran hasil. Namun, jika hasil akhirnya benar tanpa operator SinkUpsertMaterializer, kami menyarankan Anda mengatur parameter table.exec.sink.upsert-materialize ke none.

Hindari menggunakan SinkUpsertMaterializer

Untuk menghindari penggunaan operator SinkUpsertMaterializer, perhatikan hal-hal berikut:

  • Pastikan bahwa kunci partisi yang digunakan untuk operasi seperti deduplikasi dan agregasi grup sama dengan kunci utama tabel sink.

  • Untuk mencegah data tidak berurutan ketika paralelisme tunggal sesuai dengan set data Anda, atur paralelisme ke 1. Dalam skenario ini, nonaktifkan operator SinkUpsertMaterializer dengan mengatur table.exec.sink.upsert-materialize ke none.

  • Jika rantai operator dibuat untuk operator Sink dan operator hulu, seperti operator deduplikasi atau agregasi grup, dan jika tidak ada masalah akurasi data saat menggunakan versi VVR sebelum 6.0, Anda dapat memigrasikan penyebaran untuk menggunakan VVR 6.0 atau lebih baru. Pastikan bahwa Anda mengatur parameter table.exec.sink.upsert-materialize ke none dan mempertahankan konfigurasi lainnya. Untuk informasi lebih lanjut tentang cara memigrasikan penyebaran, lihat Tingkatkan Versi Mesin Penyebaran.

Jika Anda perlu menggunakan operator SinkUpsertMaterializer, perhatikan hal-hal berikut:

  • Jangan tambahkan kolom yang dihasilkan oleh fungsi non-deterministik, seperti CURRENT_TIMESTAMP dan NOW, saat menulis data ke tabel sink. Ini mencegah pembengkakan abnormal dalam status operator SinkUpsertMaterializer saat kunci upsert tidak tersedia.

  • Jika status operator SinkUpsertMaterializer besar dan memengaruhi kinerja, tingkatkan paralelisme penyebaran. Untuk informasi lebih lanjut, lihat Konfigurasikan Sumber Daya untuk Penyebaran.

Masalah yang diketahui

Operator SinkUpsertMaterializer menyelesaikan masalah peristiwa changelog yang tidak berurutan, tetapi dapat menyebabkan peningkatan terus-menerus dalam ukuran status karena alasan berikut:

  • Periode retensi status terlalu lama karena tidak ada waktu hidup status (TTL) yang dikonfigurasi atau TTL status terlalu lama. Namun, TTL yang terlalu pendek dapat mengakibatkan retensi data kotor yang tidak perlu dalam status, seperti yang dijelaskan dalam FLINK-29225. Masalah ini terjadi ketika interval waktu antara peristiwa DELETE dan peristiwa ADD yang sesuai melebihi TTL yang dikonfigurasi. Dalam hal ini, Flink menghasilkan pesan peringatan berikut dalam log:

    int index = findremoveFirst(values, row);     
    if (index == -1) {          
        LOG.info(STATE_CLEARED_WARN_MSG);          
        return;     
    }

    Kami menyarankan Anda mengonfigurasi TTL berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut, lihat Konfigurasikan Penyebaran. Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau lebih baru memungkinkan Anda mengonfigurasi nilai TTL yang berbeda untuk operator yang berbeda untuk mengurangi konsumsi sumber daya penyebaran dengan status besar. Untuk informasi lebih lanjut, lihat Konfigurasikan Paralelisme, Strategi Chaining, dan TTL Operator.

  • Jika kunci upsert tidak dapat disimpulkan untuk aliran update yang diterima oleh operator SinkUpsertMaterializer dan aliran update mencakup kolom non-deterministik, data historis tidak dapat dihapus seperti yang diharapkan. Ini menghasilkan peningkatan terus-menerus dalam ukuran status.

Referensi

Untuk informasi mengenai pemetaan versi mesin antara Realtime Compute for Apache Flink dan Apache Flink, lihat Catatan Rilis.