Topik ini menjelaskan cara mengintegrasikan Hologres dengan Flink untuk menghitung jumlah pengunjung unik (UV) secara real-time.
Prasyarat
Instans Hologres telah dibeli dan terhubung menggunakan alat pengembangan. Contoh ini menggunakan HoloWeb. Untuk informasi lebih lanjut tentang cara terhubung ke instans Hologres menggunakan HoloWeb, lihat Terhubung ke HoloWeb dan Melakukan Kueri.
Kluster Flink telah dibuat. Anda dapat menggunakan Flink yang Sepenuhnya Dikelola dari Realtime Compute for Apache Flink atau Apache Flink.
Informasi latar belakang
Hologres sangat kompatibel dengan Flink. Hologres mendukung penulisan data berkapasitas tinggi dari Flink secara real-time dan kueri real-time dari data yang telah ditulis. Hologres memungkinkan Anda menggabungkan tabel sumber dengan tabel dimensi melalui Pernyataan SQL Flink. Hologres juga mendukung fitur change data capture (CDC) untuk analitik data. Anda dapat mengintegrasikan Hologres dengan Flink untuk menghitung jumlah UV secara real-time. Gambar berikut menunjukkan alur kerja.
Flink berlangganan data baru yang dikumpulkan secara real-time. Data tersebut dapat berasal dari log seperti Kafka.
Flink mengonversi aliran data menjadi tabel sumber. Kemudian, Flink menggabungkan tabel sumber dengan tabel dimensi Hologres untuk menulis data dari tabel sumber ke Hologres secara real-time.
Hologres memproses data yang telah ditulis secara real-time.
Data yang telah diproses digunakan oleh layanan data lapisan atas seperti DataService Studio dan Quick BI.
Cara kerjanya
Integrasi tingkat tinggi antara Flink dan Hologres serta dukungan asli roaring bitmaps di Hologres memungkinkan Anda menghitung jumlah UV secara real-time berdasarkan deduplikasi berbasis tag. Gambar berikut menunjukkan diagram alir.
Di Flink, berlangganan data pengguna dari sumber data seperti Kafka dan Redis, dan gunakan program DataStream untuk mengonversi aliran data menjadi tabel sumber.
Buat tabel pemetaan ID unik (UID) di Hologres untuk menyimpan UID pengguna historis dan UID 32-bit auto-increment mereka.
CatatanDalam banyak kasus, UID yang dikumpulkan dalam aktivitas bisnis atau titik pelacakan adalah tipe STRING atau LONG. Dalam kasus ini, Anda harus membuat tabel pemetaan UID. UID yang disimpan dalam roaring bitmaps harus berupa bilangan bulat 32-bit, dan bilangan bulat berturut-turut lebih disukai. Tabel pemetaan UID berisi kolom tipe SERIAL yang terdiri dari bilangan bulat 32-bit auto-increment. Dengan cara ini, pemetaan UID dikelola secara otomatis dan tetap stabil.
Di Flink, gunakan tabel pemetaan UID di Hologres sebagai tabel dimensi, dan gunakan fitur insertIfNotExists dari tabel dimensi Hologres untuk memetakan UID secara efisien berdasarkan bilangan bulat 32-bit auto-increment. Gabungkan tabel sumber dengan tabel dimensi Hologres dan konversikan hasil gabungan menjadi aliran data.
Buat tabel di Hologres untuk mengagregasi hasil yang diproses. Flink memproses hasil gabungan berdasarkan jendela waktu dan menjalankan fungsi roaring bitmap berdasarkan dimensi kueri.
Kueri tabel hasil agregasi berdasarkan dimensi kueri. Lakukan operasi
ORpada bidang roaring bitmap dan hitung jumlah entri data. Jumlah UV diperoleh.
Dengan cara ini, Anda dapat memperoleh data UV dan page view (PV) yang lebih halus. Anda dapat menyesuaikan jendela statistik minimum seperti UV dalam 5 menit terakhir sesuai kebutuhan bisnis Anda. Ini memiliki efek serupa dengan pemantauan real-time dan menampilkan data dengan lebih baik di layar besar alat business intelligence (BI). Solusi ini memberikan performa yang lebih baik dalam deduplikasi data yang lebih halus pada tanggal bisnis tertentu dibandingkan dengan deduplikasi per hari, minggu, atau bulan. Solusi ini juga dapat menyediakan data deduplikasi untuk periode waktu yang relatif lama dengan mengagregasi hasil deduplikasi. Jika hasilnya diagregasi secara halus tetapi kondisi filter atau dimensi agregasi tidak diberikan, hasilnya mungkin diagregasi sekali lagi saat dikueri. Hal ini berdampak negatif pada performa perhitungan.
Solusi ini mudah digunakan. Anda dapat menetapkan dimensi untuk perhitungan. Solusi ini menyimpan data dalam bitmap, yang secara signifikan mengurangi ruang penyimpanan yang diperlukan. Selain itu, solusi ini mengembalikan hasil deduplikasi secara real-time. Semua manfaat ini bersama-sama membantu membangun gudang data multi-dimensi yang menyediakan fitur yang melimpah dan mendukung analitik data fleksibel secara real-time.
Prosedur
Buat tabel di Hologres.
Buat tabel pemetaan UID.
Eksekusi pernyataan berikut untuk membuat tabel pemetaan UID bernama uid_mapping di Hologres. Tabel pemetaan UID digunakan untuk menetapkan pemetaan antara UID dan bilangan bulat 32-bit mereka. Jika UID asli adalah bilangan bulat 32-bit, lewati langkah ini.
Dalam banyak kasus, UID yang dikumpulkan dalam aktivitas bisnis atau titik pelacakan adalah tipe STRING atau LONG. Dalam kasus ini, Anda harus membuat tabel pemetaan UID. UID yang disimpan dalam roaring bitmaps harus berupa bilangan bulat 32-bit, dan bilangan bulat berturut-turut lebih disukai. Tabel pemetaan UID berisi kolom tipe SERIAL yang terdiri dari bilangan bulat 32-bit auto-increment. Dengan cara ini, pemetaan UID dikelola secara otomatis dan tetap stabil.
Aliran data tentang UID dikumpulkan secara real-time dan diubah menjadi tabel sumber berorientasi baris. Ini meningkatkan jumlah permintaan per detik (QPS) ketika Anda menggabungkan tabel sumber dengan tabel dimensi Hologres di Flink.
Parameter Grand Unified Configuration (GUC) harus ditentukan untuk menggunakan mesin eksekusi yang dioptimalkan untuk menulis data ke tabel yang berisi kolom tipe SERIAL. Untuk informasi lebih lanjut, lihat Percepat Eksekusi Pernyataan SQL dengan Menggunakan Rencana Tetap.
-- Tentukan parameter GUC untuk mengizinkan penggunaan rencana tetap untuk menulis data ke tabel yang berisi kolom tipe data SERIAL. alter database <dbname> set hg_experimental_enable_fixed_dispatcher_autofill_series=on; alter database <dbname> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on; BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); -- Konfigurasikan kolom UID sebagai kunci klastering dan kunci distribusi untuk menemukan bilangan bulat 32-bit yang sesuai dengan UID dengan cepat. CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;Buat tabel hasil agregasi.
Buat tabel hasil agregasi bernama dws_app untuk menyimpan hasil agregasi.
Sebelum menggunakan fungsi roaring bitmap, pastikan bahwa Anda telah menginstal ekstensi untuk roaring bitmaps dan versi instans Hologres Anda adalah V0.10 atau lebih baru.
CREATE EXTENSION IF NOT EXISTS roaringbitmap;Dibandingkan dengan tabel hasil offline, tabel hasil agregasi ini menambahkan kolom timestamp untuk menghitung data yang dikumpulkan berdasarkan siklus hidup jendela waktu Flink. Pernyataan DDL berikut memberikan contoh:
BEGIN; CREATE TABLE dws_app( country text, prov text, city text, ymd text NOT NULL, -- Kolom tanggal. timetz TIMESTAMPTZ, -- Kolom timestamp yang digunakan untuk menghitung data yang dikumpulkan berdasarkan siklus hidup jendela waktu Flink. uid32_bitmap roaringbitmap, -- Data roaring bitmap yang digunakan untuk menghitung UV. PRIMARY KEY (country, prov, city, ymd, timetz) -- Konfigurasikan kolom tentang dimensi kueri dan waktu sebagai kolom kunci utama untuk mencegah data dimasukkan berulang kali. ); CALL set_table_property('public.dws_app', 'orientation', 'column'); -- Tetapkan kolom tanggal sebagai kunci klastering dan kolom waktu acara untuk menyaring data. CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); -- Tetapkan kolom tentang dimensi kueri sebagai kolom kunci distribusi. CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
Gunakan Flink untuk membaca aliran data secara real-time dan memperbarui tabel hasil agregasi.
Untuk informasi tentang kode sampel lengkap, lihat alibabacloud-hologres-connectors. Langkah-langkah berikut dilakukan di Flink dalam contoh ini:
Baca aliran data dan ubah data menjadi tabel sumber.
Gunakan Flink untuk membaca data dari sumber data dalam mode streaming. Anda dapat memilih file CSV atau sumber data Kafka atau Redis berdasarkan kebutuhan bisnis Anda. Kode sampel berikut memberikan contoh tentang cara mengonversi data menjadi tabel:
// Dalam contoh ini, sumber data adalah file CSV. Anda juga dapat memilih sumber data Kafka atau Redis. DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // Sebelum Anda menggabungkan tabel sumber dengan tabel dimensi, tambahkan kolom yang menggambarkan properti proctime ke tabel sumber. Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); -- Buat tampilan katalog. tableEnv.createTemporaryView("odsTable", odsTable);Gabungkan tabel sumber dengan tabel dimensi Hologres bernama uid_mapping.
Saat membuat tabel dimensi Hologres di Flink, atur parameter
insertIfNotExistske true. Ini memastikan bahwa Anda dapat memasukkan data secara manual ke tabel dimensi jika tidak ada data yang ditemukan. Bidang uid_int32 adalah kolom tipe SERIAL yang berisi bilangan bulat 32-bit auto-increment. Kode sampel berikut memberikan contoh tentang cara menggabungkan tabel:-- Buat tabel dimensi Hologres. Parameter insertIfNotExists menentukan apakah akan memasukkan data secara manual ke tabel dimensi jika data tidak dapat ditemukan. String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," // Basis data Hologres tempat tabel dimensi Hologres berada. + " 'tablename' = '%s'," // Nama tabel dimensi Hologres. + " 'username' = '%s'," // ID AccessKey akun Alibaba Cloud Anda. + " 'password' = '%s'," // Rahasia AccessKey akun Alibaba Cloud Anda. + " 'endpoint' = '%s'," //Titik akhir Hologres + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); -- Gabungkan tabel sumber dengan tabel dimensi Hologres. String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);Ubah hasil gabungan menjadi aliran data.
Gunakan jendela waktu Flink untuk memproses aliran data dan jalankan fungsi roaring bitmap untuk menghapus duplikat data. Kode sampel berikut memberikan contoh:
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source -- Dimensi berdasarkan mana data dikueri. Dalam contoh ini, dimensinya adalah kolom country, prov, city, dan ymd. .keyBy(0, 1, 2, 3) -- Jendela tumbling Flink. Dalam contoh ini, sumber data adalah file CSV, sehingga aliran data dialokasikan ke jendela berdasarkan waktu pemrosesan. Dalam skenario nyata, Anda dapat mengalokasikan aliran data berdasarkan waktu pemrosesan atau waktu acara berdasarkan kebutuhan bisnis Anda. .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) -- Pemicu. Anda dapat memperoleh hasil agregasi sebelum jendela dihapus. .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( -- Fungsi agregat, yang digunakan untuk mengagregasi hasil berdasarkan dimensi kueri yang ditentukan. new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { -- Jalankan fungsi roaring bitmap untuk UID 32-digit untuk menghapus UID duplikat. acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, -- Fungsi jendela, yang digunakan untuk menghasilkan hasil agregasi. new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // Optimalkan hasil fungsi roaring bitmap. result.runOptimize(); // Ubah hasil fungsi roaring bitmap menjadi larik byte dan simpan di Hologres. byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // Parameter Tuple6 menentukan bahwa aliran data diproses berdasarkan siklus hidup jendela. Nilai parameter diukur dalam detik. out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });Tulis data unik ke tabel hasil agregasi Hologres.
Tulis data unik ke tabel hasil agregasi Hologres bernama dws_app. Hasil fungsi roaring bitmap disimpan sebagai larik byte di Flink. Kode sampel berikut memberikan contoh:
-- Ubah hasil yang diproses menjadi tabel. Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); -- Buat tabel hasil agregasi di Hologres. Simpan hasil fungsi roaring bitmap ke tabel sebagai larik byte. String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); -- Tulis hasil ke tabel bernama dws_app. tableEnv.executeSql("insert into sink select * from " + resTable);
Kueri data.
Hitung UV berdasarkan data di tabel dws_app. Lakukan operasi agregasi berdasarkan dimensi kueri dan kueri jumlah bit dalam bitmap. Dengan cara ini, Anda dapat menghitung UV di bawah kondisi yang ditentukan oleh klausa GROUP BY.
Contoh 1: Kueri jumlah UV setiap kota pada hari tertentu
-- Lakukan operasi RB_AGG berikut untuk mengkueri data. Anda dapat menonaktifkan fitur agregasi tiga tahap untuk performa yang lebih baik. Anda dapat mengaktifkan atau menonaktifkan fitur ini berdasarkan kebutuhan Anda. Secara default, fitur ini dinonaktifkan. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;Contoh 2: Kueri UV dan PV setiap provinsi dalam periode waktu tertentu
-- Lakukan operasi RB_AGG berikut untuk mengkueri data. Anda dapat menonaktifkan fitur agregasi tiga tahap untuk performa yang lebih baik. Anda dapat mengaktifkan atau menonaktifkan fitur ini berdasarkan kebutuhan Anda. Secara default, fitur ini dinonaktifkan. set hg_experimental_enable_force_three_stage_agg=off; SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv ,SUM(pv) AS pv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
Tampilkan data secara visual.
Dalam kebanyakan kasus, Anda perlu menggunakan alat Business Intelligence (BI) untuk menampilkan UV dan PV yang dihitung secara visual. Dalam proses perhitungan, fungsi RB_CARDINALITY dan RB_OR_AGG digunakan untuk mengagregasi data. Oleh karena itu, alat BI harus mendukung fungsi agregasi kustom. Anda dapat menggunakan alat BI umum seperti Apache Superset dan Tableau.
Apache Superset
Hubungkan Apache Superset ke Hologres. Untuk informasi lebih lanjut, lihat Apache Superset.
Tetapkan tabel dws_app sebagai dataset.

Buat metrik bernama UV di dataset menggunakan ekspresi yang ditunjukkan pada gambar berikut.

RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))Kemudian, Anda dapat mulai mengeksplorasi data.
Opsional. Buat dasbor.
Untuk informasi lebih lanjut tentang cara membuat dasbor, lihat Membuat Dasbor Pertama Anda.
Tableau
Hubungkan Tableau ke Hologres. Untuk informasi lebih lanjut, lihat Tableau.
Anda dapat menggunakan fungsi pass-through di Tableau untuk menyesuaikan fungsi. Untuk informasi lebih lanjut, lihat Fungsi Pass-Through (RAWSQL).
Buat bidang kalkulasi menggunakan ekspresi yang ditunjukkan pada gambar berikut.

RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])Kemudian, Anda dapat mulai mengeksplorasi data.
Opsional. Buat dasbor.
Untuk informasi lebih lanjut tentang cara membuat dasbor, lihat Buat Dasbor.