Topik ini menjelaskan cara menggunakan Hologres dan Flink untuk melakukan deduplikasi pengunjung unik (UV) waktu nyata yang akurat.
Prasyarat
Anda telah mengaktifkan Hologres dan menghubungkannya ke alat pengembang. Topik ini menggunakan HoloWeb sebagai contoh. Untuk informasi lebih lanjut, lihat Hubungkan ke HoloWeb dan jalankan kueri.
Anda telah menyiapkan kluster Flink. Anda dapat menggunakan fully managed Flink dari Alibaba Cloud atau open source Flink.
Informasi latar belakang
Hologres terintegrasi erat dengan Flink. Hologres mendukung penulisan data waktu nyata ber-throughput tinggi dari Flink dengan visibilitas langsung. Hologres juga mendukung join tabel dimensi Flink SQL dan dapat berfungsi sebagai sumber Change Data Capture (CDC) untuk pengembangan berbasis event. Deduplikasi UV waktu nyata terutama menggunakan Flink dan Hologres. Diagram berikut menunjukkan arsitektur tersebut.
Flink berlangganan data waktu nyata dari sumber seperti log Kafka.
Flink memproses data, mengonversi aliran menjadi tabel, melakukan join dengan tabel dimensi Hologres, dan menulis hasilnya ke Hologres secara waktu nyata.
Hologres memproses data dari Flink secara waktu nyata.
Hasil kueri diumpankan ke aplikasi data tingkat lebih tinggi, seperti DataService Studio dan Quick BI.
Alur perhitungan UV waktu nyata
Flink dan Hologres terintegrasi erat. Anda dapat menggunakan dukungan RoaringBitmap native di Hologres untuk menghitung UV waktu nyata dan menghapus duplikat tag pengguna. Diagram berikut menunjukkan proses tersebut.
Flink berlangganan data pengguna waktu nyata dari sumber seperti Kafka atau Redis dan mengonversinya menjadi tabel sumber menggunakan DataStream.
Buat tabel pemetaan pengguna di Hologres. Tabel ini menyimpan ID pengguna historis (UID) dan UID auto-increment 32-bit yang sesuai.
CatatanID pengguna dari sistem bisnis atau instrumentasi umumnya berupa string atau integer panjang. RoaringBitmap mengharuskan ID pengguna berupa integer 32-bit yang sedens mungkin, artinya ID harus berurutan. Tabel pemetaan menggunakan tipe `SERIAL` di Hologres, yaitu integer 32-bit auto-increment, untuk mengelola pemetaan pengguna secara otomatis dan konsisten.
Di Flink, gunakan tabel pemetaan pengguna Hologres sebagai tabel dimensi Flink. Anda dapat menggunakan atribut `insertIfNotExists` dengan auto-increment field untuk memetakan UID secara efisien. Kemudian, lakukan join tabel dimensi dengan tabel sumber data dan konversi hasilnya menjadi DataStream.
Flink memproses data hasil join tabel dimensi berdasarkan jendela waktu, menerapkan fungsi RoaringBitmap berdasarkan dimensi kueri, dan menyimpan hasilnya ke tabel sink agregasi di Hologres.
Kueri mirip dengan metode offline. Anda dapat mengambil jumlah pengguna dengan langsung mengkueri tabel sink agregasi berdasarkan kondisi kueri Anda, melakukan operasi
orpada bidang RoaringBitmap utama, lalu menghitung kardinalitasnya.
Metode ini menyediakan data UV dan page view (PV) pengguna waktu nyata dengan detail halus. Anda dapat dengan mudah menyesuaikan jendela waktu minimum, seperti UV dalam lima menit terakhir, untuk membuat dasbor pemantauan waktu nyata guna tampilan Intelijen Bisnis (BI). Pendekatan ini lebih unggul untuk statistik detail halus selama event dibandingkan deduplikasi harian, mingguan, atau bulanan. Anda juga dapat menggunakan agregasi sederhana untuk mendapatkan statistik dalam satuan waktu yang lebih besar. Jika granularitas agregasi terlalu detail dan kueri tidak memiliki kondisi filter atau dimensi agregasi yang sesuai, hal ini dapat menyebabkan operasi agregasi tambahan yang menurunkan performa.
Solusi ini menyediakan tautan data yang sederhana dan perhitungan fleksibel di berbagai dimensi. Solusi ini hanya memerlukan satu Bitmap untuk penyimpanan, sehingga menghindari pembengkakan penyimpanan dan memastikan pembaruan waktu nyata. Hal ini menciptakan gudang data yang lebih responsif, fleksibel, dan kaya fitur untuk analisis multidimensi.
Prosedur
Buat tabel dasar yang diperlukan di Hologres
Buat tabel pemetaan pengguna
Di Hologres, buat tabel bernama `uid_mapping` sebagai tabel pemetaan pengguna. Tabel ini memetakan UID ke tipe integer 32-bit. Jika UID asli Anda sudah berupa integer 32-bit, Anda dapat melewati langkah ini.
ID pengguna dari sistem bisnis atau instrumentasi umumnya berupa string atau integer panjang. RoaringBitmap mengharuskan ID pengguna berupa integer 32-bit yang sedens mungkin, artinya ID harus berurutan. Tabel pemetaan menggunakan tipe `SERIAL` di Hologres, yaitu integer 32-bit auto-increment, untuk pemetaan pengguna otomatis dan konsisten.
Karena ini adalah data waktu nyata, tabel dibuat sebagai tabel berorientasi baris di Hologres untuk meningkatkan permintaan per detik (QPS) join waktu nyata dengan tabel dimensi Flink.
Aktifkan parameter Grand Unified Configuration (GUC) yang sesuai untuk menggunakan mesin eksekusi yang dioptimalkan saat menulis ke tabel yang berisi kolom serial. Untuk informasi lebih lanjut, lihat Percepat eksekusi SQL dengan Fixed Plan.
-- Aktifkan GUC untuk mendukung penulisan Fixed Plan pada kolom bertipe Serial. alter database <db_name> set hg_experimental_enable_fixed_dispatcher_autofill_series=on; alter database <db_name> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on; BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); -- Tetapkan uid sebagai clustering_key dan distribution_key untuk menemukan nilai int32-nya dengan cepat. CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;Buat tabel sink untuk agregasi
Buat tabel bernama `dws_app` untuk menyimpan hasil agregasi.
Sebelum menggunakan fungsi RoaringBitmap, Anda harus membuat ekstensi RoaringBitmap. Instans Hologres Anda harus versi V0.10 atau lebih baru.
CREATE EXTENSION IF NOT EXISTS roaringbitmap;Berbeda dengan tabel sink offline, tabel ini mencakup bidang timestamp untuk statistik berdasarkan epoch jendela Flink. Berikut adalah Data Definition Language (DDL) untuk tabel sink tersebut.
BEGIN; CREATE TABLE dws_app( country text, prov text, city text, ymd text NOT NULL, -- Bidang tanggal timetz TIMESTAMPTZ, -- Timestamp statistik, memungkinkan statistik berdasarkan epoch jendela Flink uid32_bitmap roaringbitmap, -- Gunakan roaringbitmap untuk mencatat UV PRIMARY KEY (country, prov, city, ymd, timetz)-- Gunakan dimensi kueri dan waktu sebagai primary key untuk mencegah penyisipan data duplikat. ); CALL set_table_property('public.dws_app', 'orientation', 'column'); -- Tetapkan bidang tanggal sebagai clustering_key dan event_time_column untuk mempermudah filtering. CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); -- Tetapkan bidang group by sebagai distribution_key. CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
Baca data secara waktu nyata dengan Flink dan perbarui tabel sink
Untuk kode sumber contoh lengkap di Flink, lihat contoh alibabacloud-hologres-connectors. Langkah-langkah berikut menjelaskan operasi di Flink.
Baca sumber data sebagai DataStream di Flink dan konversi menjadi tabel sumber
Di Flink, baca data dari sumber streaming. Sumber data dapat berupa file CSV, Kafka, Redis, atau sumber lainnya, tergantung skenario Anda. Contoh kode berikut menunjukkan cara mengonversi aliran menjadi tabel sumber di Flink.
// File CSV digunakan sebagai sumber data di sini. Anda juga dapat menggunakan Kafka, Redis, atau sumber lainnya. DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // Bidang proctime harus ditambahkan untuk join dengan tabel dimensi. Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // Daftarkan tabel di lingkungan katalog. tableEnv.createTemporaryView("odsTable", odsTable);Lakukan join tabel sumber dengan tabel dimensi Hologres (uid_mapping)
Saat membuat tabel dimensi Hologres di Flink, Anda harus menggunakan parameter
insertIfNotExistsuntuk menyisipkan data jika belum ada. Hal ini memungkinkan bidang `uid_int32` melakukan auto-increment menggunakan tipe `Serial` di Hologres. Contoh kode berikut menunjukkan cara melakukan join tabel:-- Buat tabel dimensi Hologres. insertIfNotExists berarti jika catatan tidak ditemukan, akan disisipkan secara otomatis. String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," // Nama DB Hologres + " 'tablename' = '%s',"// Nama tabel Hologres + " 'username' = '%s'," // ID AccessKey akun saat ini + " 'password' = '%s'," // Rahasia AccessKey akun saat ini + " 'endpoint' = '%s'," // Titik akhir Hologres + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); -- Lakukan join tabel sumber dan tabel dimensi. String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);Konversi hasil join menjadi DataStream
Proses data menggunakan jendela waktu Flink dan gunakan RoaringBitmap untuk menghapus duplikat metrik. Kode berikut memberikan contohnya.
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source -- Filter berdasarkan dimensi untuk statistik (country, prov, city, ymd). .keyBy(0, 1, 2, 3) -- Jendela waktu tumbling. Karena file CSV digunakan untuk mensimulasikan aliran input, ProcessingTime digunakan di sini. Dalam skenario dunia nyata, Anda dapat menggunakan EventTime. .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) -- Pemicu, yang dapat menghasilkan hasil agregasi sebelum jendela ditutup. .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( -- Fungsi agregasi, yang melakukan agregasi berdasarkan dimensi yang difilter oleh keyBy. new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { -- Tambahkan UID 32-bit ke RoaringBitmap untuk deduplikasi. acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, -- Fungsi jendela, yang mengeluarkan hasil agregasi. new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // Optimalkan RoaringBitmap. result.runOptimize(); // Konversi RoaringBitmap menjadi array byte untuk disimpan di Hologres. byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // Bidang Tuple6.f4(Timestamp) menunjukkan bahwa statistik dilakukan berdasarkan epoch panjang jendela, dalam satuan detik. out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });Tulis data ke tabel sink Hologres
Tulis data yang telah dideduplikasi dari Flink ke tabel sink dws_app di Hologres. Perhatikan bahwa tipe `RoaringBitmap` di Hologres bersesuaian dengan tipe array byte di Flink. Berikut adalah kode Flink-nya.
-- Konversi hasil perhitungan menjadi tabel. Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); -- Buat tabel sink Hologres. Tipe RoaringBitmap di Hologres disimpan sebagai array byte. String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); -- Tulis hasil perhitungan ke tabel dws_app. tableEnv.executeSql("insert into sink select * from " + resTable);
Kueri data
Di Hologres, hitung UV dari tabel sink `dws_app`. Lakukan agregasi berdasarkan dimensi kueri Anda dan kueri kardinalitas bitmap untuk mengambil jumlah pengguna berdasarkan kondisi `GROUP BY`.
Contoh 1: Kueri UV setiap kota pada hari tertentu
-- Untuk menjalankan kueri RB_AGG berikut, Anda dapat menonaktifkan sakelar agregasi tiga tahap (secara default dinonaktifkan) demi performa yang lebih baik. Langkah ini opsional. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;Contoh 2: Kueri UV dan PV setiap provinsi dalam rentang waktu tertentu
-- Untuk menjalankan kueri RB_AGG berikut, Anda dapat menonaktifkan sakelar agregasi tiga tahap (secara default dinonaktifkan) demi performa yang lebih baik. Langkah ini opsional. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv ,SUM(pv) AS pv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
Visualisasikan data
Setelah menghitung UV dan PV, Anda dapat menggunakan alat BI untuk visualisasi. Karena agregasi memerlukan RB_CARDINALITY dan RB_OR_AGG, alat BI harus mendukung user-defined aggregate function. Alat BI yang mendukung fitur ini antara lain Apache Superset dan Tableau.
Apache Superset
Hubungkan Apache Superset ke Hologres. Untuk informasi lebih lanjut, lihat Apache Superset.
Tetapkan tabel dws_app sebagai dataset.

Di dataset, buat metrik bernama UV dengan ekspresi berikut.

RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))Anda kini dapat mulai menjelajahi data Anda.
(Opsional) Buat dasbor.
Untuk membuat dasbor, lihat Create Dashboard.
Tableau
Hubungkan Tableau ke Hologres. Untuk informasi lebih lanjut, lihat Tableau.
Anda dapat menggunakan fungsi passthrough Tableau untuk langsung mengimplementasikan user-defined function. Untuk informasi lebih lanjut, lihat Passthrough Functions.
Buat bidang terhitung dengan ekspresi berikut.

RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])Anda kini dapat mulai menjelajahi data Anda.
(Opsional) Buat dasbor.
Untuk membuat dasbor, lihat Create a Dashboard.