Topik ini menjelaskan cara menggunakan tipe data BITMAP di Realtime Compute for Apache Flink untuk deduplikasi eksak, serta menyediakan contoh SQL untuk skenario aplikasi khas.
Informasi latar belakang
Dalam komputasi real-time Flink, solusi deduplikasi tradisional memiliki keterbatasan berikut:
Overhead resource tinggi: Fungsi
COUNT DISTINCTharus menyimpan semua kunci deduplikasi dalam state. Memproses volume data besar menyebabkan state membengkak, sehingga mengonsumsi memori dan sumber daya CPU secara signifikan.Skalabilitas multidimensi sulit: Solusi pra-agregasi kurang fleksibel. Untuk mendukung kueri pada kombinasi apa pun dari N dimensi, Anda harus membuat daftar 2N kelompok, yang membuat logika pekerjaan menjadi kompleks.
Akurasi terganggu: Algoritma aproksimasi seperti
APPROX_COUNT_DISTINCTmemiliki kesalahan statistik dan tidak dapat memenuhi persyaratan akurasi ketat pada beberapa skenario bisnis.
Solusi BITMAP
Untuk menyeimbangkan akurasi dan performa, Realtime Compute for Apache Flink memperkenalkan tipe data BITMAP.
Cara kerja: Solusi ini biasanya berbasis algoritma RoaringBitmap. Data integer detail dikompresi menjadi objek bitmap. Logika komputasi berubah dari pemeliharaan set yang mahal menjadi operasi logika bitmap yang efisien, seperti union, intersection, dan difference.
Keunggulan utama: BITMAP secara signifikan mengurangi kebutuhan penyimpanan state dan memungkinkan roll-up, drill-down, serta analisis silang pada dimensi apa pun secara fleksibel tanpa kehilangan akurasi.
Batasan
Hanya Ververica Runtime (VVR) versi 11.5 dan yang lebih baru dari Realtime Compute for Apache Flink yang mendukung tipe data BITMAP beserta fungsi terkaitnya.
Penggunaan
Tipe BITMAP adalah himpunan bilangan bulat tak bertanda 32-bit berdasarkan standar RoaringBitmap. Tipe ini mendukung perhitungan kardinalitas dalam waktu konstan dan operasi aljabar set yang efisien. Ada dua cara utama menggunakannya:
Deduplikasi eksak real-time
Anda dapat memproses data detail mentah secara langsung. Ini memberikan kemampuan deduplikasi yang sama seperti COUNT DISTINCT, tetapi dengan efisiensi memori dan performa komputasi yang lebih baik untuk data berskala besar. (Lihat Contoh 1)
Pra-agregasi untuk analisis multidimensi
Anda dapat menghasilkan bitmap yang dipotong berdasarkan dimensi bisnis dan menyimpannya ke penyimpanan persisten. Hal ini memungkinkan penggunaan fungsi operasi set untuk melakukan analisis roll-up dan drill-down secara fleksibel pada dimensi apa pun tanpa mengakses data detail. (Lihat Contoh 2, 3, dan 4)
Untuk informasi selengkapnya tentang fungsi bawaan BITMAP dan penggunaannya, lihat Fungsi BITMAP.
Contoh
Empat contoh berikut menunjukkan bagaimana BITMAP menyelesaikan masalah performa dan fleksibilitas pada berbagai tahap bisnis:
Skenario dasar (Contoh 1): Fokus pada output hasil real-time. Contoh ini menunjukkan cara mengganti
COUNT DISTINCTsecara langsung dengan BITMAP untuk mendapatkan hasil deduplikasi eksak yang sama dengan konsumsi resource lebih rendah.Skenario lanjutan: Fokus pada pelapisan gudang data dan analisis fleksibel. Ketiga contoh ini membentuk alur kerja lengkap pra-agregasi dan pasca-komputasi:
Penyimpanan (Contoh 2): Mengubah data detail menjadi potongan bitmap dan menyimpannya secara persisten untuk membangun lapisan antara yang dapat digunakan kembali.
Analisis (Contoh 3): Berdasarkan data lapisan antara, Anda dapat menggunakan operasi bitmap untuk melakukan roll-up dan analisis silang pada dimensi apa pun tanpa memproses ulang data historis.
Integrasi (Contoh 4): Menggabungkan aliran real-time dengan bitmap historis offline. Hal ini memungkinkan perhitungan metrik integrasi stream-batch dengan biaya rendah, seperti month-over-month, year-over-year, dan laju retensi.
1. Deduplikasi eksak real-time
Gunakan fungsi BITMAP_BUILD_CARDINALITY_AGG untuk menghitung jumlah pengunjung unik (UV) per menit dari log real-time.
DDL
-- Tabel sumber data
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- Tag event
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- Tabel sink untuk statistik UV per menit
CREATE TEMPORARY TABLE minute_uv
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Tag event
,minute_uv BIGINT -- Jumlah UV
)
WITH ('connector' = 'jdbc' ...)
;DML
Logika inti: Gunakan fungsi
BITMAP_BUILD_CARDINALITY_AGGuntuk melakukan deduplikasi dan penghitungan secara simultan saat perhitungan window selesai.Skenario: Dasbor real-time dan metrik pemantauan sederhana per menit.
Catatan: Output berupa nilai skalar dan tidak aditif. Karena prinsip deduplikasi set, Anda tidak dapat langsung menjumlahkan nilai statistik dari jendela waktu berbeda untuk mendapatkan total UV dalam rentang waktu yang lebih panjang. Misalnya, Anda tidak bisa hanya menjumlahkan dua hitungan "UV 1 menit" untuk mendapatkan hitungan "UV deduplikasi 2 menit".
INSERT INTO minute_uv
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- Fungsi inti: Membangun bitmap dan langsung mengembalikan kardinalitasnya (jumlah deduplikasi).
-- Logika ini setara dengan COUNT(DISTINCT user_id) tetapi menawarkan performa lebih baik.
BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;2. Menyimpan bitmap per menit
Simpan data bitmap per menit dari log real-time ke sistem penyimpanan eksternal.
DDL
-- Tabel sumber data
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- Tag event
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- Tabel sink untuk bitmap per menit
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Tag event
,minute_bm BYTES -- Bitmap per menit yang telah diserialisasi
)
WITH ('connector' = 'jdbc' ...)
;DML
Logika inti: Lakukan transformasi ringan dari data detail ke bitmap di sisi Flink. Alih-alih langsung mengeluarkan nilai hitungan, Anda dapat mengeluarkan data biner yang telah diserialisasi.
Detail teknis: Gunakan
BITMAP_BUILD_AGGuntuk membuat objek bitmap. Kemudian, gunakanBITMAP_TO_BYTESuntuk mengubahnya menjadi larik byte (VARBINARY/BYTES) yang sesuai dengan standar RoaringBitmap.Skenario: Membangun lapisan antara gudang data (DWD/DWS) atau menghubungkan ke mesin Pemrosesan Analitik Online (OLAP) yang mendukung bitmap, seperti StarRocks atau Hologres.
Keunggulan utama: Bersifat aditif. Data bitmap biner output mempertahankan informasi set. Pekerjaan downstream atau mesin kueri dapat memuat data ini kapan saja, lalu menggunakan operasi union bitmap (OR) untuk menggabungkan beberapa bitmap per menit menjadi bitmap per jam atau per hari, memungkinkan roll-up fleksibel lintas jendela waktu.
INSERT INTO minute_bitmaps
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- Proses inti: Bangun dan serialisasi bitmap.
-- 1. BITMAP_BUILD_AGG: Mengagregasi user_id dalam window menjadi objek bitmap di memori.
-- 2. BITMAP_TO_BYTES: Mengenkoding objek bitmap ke format biner standar untuk penyimpanan persisten.
BITMAP_TO_BYTES(
BITMAP_BUILD_AGG(user_id)
) AS user_id_bitmap
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;3. Profil pengguna dan filter kelompok
Berdasarkan data bitmap per menit, hitung jumlah pengguna per jam yang mencari kata kunci (A) dan menambahkan produk ke keranjang belanja mereka (B), tetapi tidak menyelesaikan pembayaran (C).
DDL
-- Tabel bitmap per menit yang telah dipra-hitung
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Tag event
,minute_bm BYTES -- Bitmap per menit yang telah diserialisasi (dari Contoh 2)
)
WITH ('connector' = 'jdbc' ...)
;
-- Tabel analisis UV per jam
CREATE TEMPORARY TABLE hour_combined_uv
(
ts_date STRING
,ts_hour STRING
,tag STRING -- Tag event
,uv BIGINT
)
WITH ('connector' = 'jdbc' ...)
;DML
Logika inti:
Deserialisasi dan roll-up: Ubah data biner dari lapisan penyimpanan kembali menjadi objek bitmap. Lalu, gunakan operasi union untuk menggabungkan beberapa bitmap per menit menjadi bitmap per jam.
Operasi aljabar set: Lakukan operasi intersection, difference, dan lainnya pada bitmap dari dimensi berbeda di memori.
Perhitungan kardinalitas: Hitung jumlah elemen dalam set hasil akhir.
Skenario bisnis: Analisis persona pengguna, analisis funnel, atau filter kelompok spesifik.
Keunggulan utama: Logika komputasi sepenuhnya terurai. Perubahan logika bisnis, seperti penambahan kondisi filter, hanya memerlukan modifikasi kueri SQL tanpa mengubah pipeline generasi data upstream, sehingga memberikan tingkat fleksibilitas analitis yang tinggi.
INSERT INTO hour_combined_uv
SELECT
ts_date,
ts_hour,
'A and B andnot C' AS metric_name,
-- Langkah 3: Hitung kardinalitas.
-- Hitung jumlah pengguna dalam bitmap akhir dan keluarkan sebagai BIGINT.
BITMAP_CARDINALITY(
-- Langkah 2: Lakukan operasi aljabar set.
-- Ekspresi logika: (Tag A ∩ Tag B) - Tag C
BITMAP_ANDNOT(
BITMAP_AND(hour_bm_a, hour_bm_b),
hour_bm_c
)
) AS uv
FROM (
SELECT
ts_date,
ts_hour,
-- Langkah 1: Deserialisasi dan lakukan roll-up waktu.
-- Kembalikan data biner yang tersimpan (BYTES) ke bitmap,
-- dan gunakan BITMAP_OR_AGG untuk menggabungkan (union) semua bitmap per menit dalam satu jam.
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'A') AS hour_bm_a,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'B') AS hour_bm_b,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'C') AS hour_bm_c
FROM minute_bitmaps
WHERE tag IN ('A', 'B', 'C')
GROUP BY
ts_date,
ts_hour
);4. Analisis retensi real-time
Berdasarkan data bitmap per menit dan per hari, contoh ini menghitung jumlah pengunjung yang kembali secara real-time untuk halaman tertentu hingga menit tertentu pada hari ini.
DDL
-- Tabel bitmap per menit yang telah dipra-hitung
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- Nama halaman
,minute_bm BYTES -- Bitmap per menit yang telah diserialisasi
)
WITH ('connector' = 'jdbc' ...)
;
-- Tabel bitmap per hari yang telah dipra-hitung
CREATE TEMPORARY TABLE daily_bitmaps
(
ts_date STRING
,tag STRING -- Nama halaman
,daily_bm BYTES -- Bitmap per hari yang telah diserialisasi
)
WITH ('connector' = 'jdbc' ...)
;
-- Tabel sink untuk metrik retensi real-time
CREATE TEMPORARY TABLE realtime_retention_metrics
(
ts_date STRING
,tag STRING -- Nama halaman
,retained_users BIGINT -- Jumlah kunjungan ulang pada hari ini
,total_users BIGINT -- Total jumlah pengunjung dari hari sebelumnya
,update_time TIMESTAMP(3)
,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;DML
Logika inti:
Agregasi real-time: Agregasi aliran bitmap per menit dari hari ini secara real-time untuk membuat bitmap kumulatif harian.
Join stream-batch: Gunakan operasi JOIN untuk mengasosiasikan bitmap real-time hari ini dengan bitmap penuh hari sebelumnya dari lapisan penyimpanan.
Operasi intersection: Lakukan operasi
BITMAP_AND(intersection) pada bitmap kelompok pengguna kedua hari tersebut untuk mendapatkan set pengguna yang tumpang tindih.
Skenario: Pemantauan real-time respons pengguna terhadap promosi penjualan atau analisis daya tempel pengguna aktif harian dalam game.
Keunggulan utama:
Komputasi sangat cepat: Solusi tradisional perlu melakukan join dua tabel besar dengan ratusan juta baris untuk menghitung
COUNT DISTINCT, yang memiliki overhead besar dan latensi tinggi. Solusi BITMAP hanya perlu melakukan operasi bitmap pada beberapa objek biner, dan komputasi selesai dalam milidetik.Penguraian keterkaitan penyimpanan: Data historis, seperti daily_bitmaps, dapat disimpan di sistem eksternal seperti HDFS, OSS, atau JDBC, sehingga menghindari penggunaan jangka panjang sumber daya state Flink.
INSERT INTO realtime_retention_metrics
SELECT
T.ts_date,
T.tag,
-- Metrik 1: Kunjungan ulang real-time pada hari ini (pengguna yang dipertahankan).
-- Logika: Pengguna yang muncul hari ini ∩ Pengguna yang muncul kemarin.
BITMAP_CARDINALITY(
BITMAP_AND(
T.today_bm, -- Bitmap real-time untuk hari ini
BITMAP_FROM_BYTES(Y.daily_bm) -- Bitmap historis dari kemarin (perlu deserialisasi)
)
) AS retained_users,
-- Metrik 2: Jumlah dasar pengunjung dari kemarin.
-- Logika: Langsung baca kardinalitas bitmap kemarin untuk digunakan sebagai penyebut perhitungan laju retensi.
BITMAP_CARDINALITY(
BITMAP_FROM_BYTES(Y.daily_bm)
) AS total_users,
CURRENT_TIMESTAMP AS update_time
FROM (
-- Subkueri T: Pemrosesan aliran real-time.
-- Mengagregasi potongan per menit hari ini menjadi bitmap "sampai saat ini" untuk hari ini secara real-time.
SELECT
ts_date,
tag,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
FROM minute_bitmaps
GROUP BY ts_date, tag
) T
-- Join inti: Menggabungkan stream (hari ini) dengan batch (kemarin).
INNER JOIN daily_bitmaps Y
ON T.tag = Y.tag
-- Kondisi join: Cocokkan data dari hari sebelum T.
AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;