Panduan ini menjelaskan cara membangun gudang data waktu nyata dengan menggunakan Realtime Compute for Apache Flink dan Hologres. Solusi ini menggabungkan pemrosesan aliran andal dari Flink dan fitur unik Hologres—seperti binary logging, penyimpanan hybrid baris-kolom, serta isolasi sumber daya yang kuat—untuk menangani volume data yang terus bertambah dan memenuhi kebutuhan bisnis waktu nyata.
Latar Belakang
Seiring dengan meningkatnya digitalisasi bisnis, permintaan terhadap data mutakhir tumbuh pesat. Gudang data offline tradisional, yang dirancang untuk pemrosesan batch volume data besar, tidak lagi mencukupi. Banyak skenario bisnis modern memerlukan pemrosesan, penyimpanan, dan analisis data secara waktu nyata. Meskipun metodologi pembangunan gudang data offline dengan arsitektur berlapis seperti ODS, DWD, dan DWS sudah mapan, kerangka kerja yang jelas untuk padanan waktu nyatanya masih kurang tersedia. Penggunaan gudang data waktu nyata memungkinkan aliran data waktu nyata yang efisien antar setiap lapisan data.
Kasus penggunaan
Panduan ini menggunakan platform e-commerce sebagai contoh untuk menunjukkan cara membangun gudang data waktu nyata dengan mengintegrasikan Flink dan Hologres. Pendekatan ini memungkinkan Anda memproses dan membersihkan data secara waktu nyata, menyediakan data berlapis dan dapat digunakan ulang bagi aplikasi downstream, serta mendukung berbagai skenario bisnis, termasuk dasbor waktu nyata (pemantauan transaksi, analitik perilaku, profil pengguna) dan rekomendasi personalisasi.
Arsitektur solusi
-
Bangun lapisan ODS (operational data store): Ingesti data dari database bisnis secara waktu nyata.
Flink menyinkronkan tiga tabel bisnis dari MySQL—
orders(tabel pesanan),orders_pay(tabel pembayaran), danproduct_catalog(kamus kategori produk)—ke Hologres secara waktu nyata. Tabel-tabel ini membentuk lapisan ODS. -
Bangun lapisan DWD (data warehouse detail): Buat tabel lebar waktu nyata.
Flink melakukan join tabel-tabel ODS secara waktu nyata untuk membuat tabel lebar bagi lapisan DWD.
-
Bangun lapisan DWS (data warehouse service): Hitung metrik waktu nyata.
Flink mengonsumsi perubahan binary logging dari tabel lebar secara event-driven untuk mengagregasi metrik ke dalam tabel spesifik pengguna dan toko bagi lapisan DWS.
-
Layani kueri aplikasi dengan Hologres.
-
Kueri tabel metrik agregat di lapisan DWS, menangani jutaan permintaan per detik (RPS).
-
Jalankan kueri OLAP pada tabel lebar DWD atau tampilkan laporan waktu nyata berdasarkan datanya, dengan tanggapan dalam hitungan detik.
-
Manfaat dan kemampuan inti
Solusi ini memberikan manfaat berikut:
-
Pembaruan efisien dan kueri langsung: Hologres mendukung pembaruan, koreksi, dan akses kueri langsung yang efisien untuk setiap lapisan data. Ini mengatasi tantangan umum pada gudang data waktu nyata tradisional, di mana data antara sulit dikueri, diperbarui, dan dikoreksi.
-
Pelapisan dan penggunaan ulang data: Setiap lapisan data di Hologres dapat melayani aplikasi eksternal secara independen. Hal ini memungkinkan penggunaan ulang data yang efisien, menciptakan gudang data berlapis dan dapat digunakan ulang.
-
Arsitektur yang disederhanakan dan efisiensi yang meningkat: Menggunakan Flink SQL untuk membangun pipeline ETL waktu nyata dan menyimpan semua lapisan data (ODS, DWD, dan DWS) di Hologres menyederhanakan arsitektur serta meningkatkan efisiensi pemrosesan data.
Solusi ini bergantung pada tiga kemampuan inti Hologres, seperti ditunjukkan pada tabel berikut.
|
Kemampuan inti |
Deskripsi |
|
Hologres menyediakan binary logging, yang memungkinkan Flink membaca perubahan data secara waktu nyata. Hal ini memungkinkan Hologres berperan sebagai sumber streaming untuk pekerjaan Flink. |
|
|
Hologres mendukung format penyimpanan hybrid di mana satu tabel menyimpan data dalam format berorientasi baris dan berorientasi kolom sekaligus dengan konsistensi kuat. Hal ini memungkinkan tabel antara berfungsi sebagai sumber Flink, tabel dimensi untuk kueri titik dan temporal join, serta sumber data bagi aplikasi lain seperti kueri OLAP atau layanan online. |
|
|
Isolasi sumber daya yang kuat |
Beban tinggi pada instans Hologres dapat memengaruhi performa kueri titik pada lapisan data antara. Hologres mendukung isolasi sumber daya yang kuat melalui pemisahan baca/tulis untuk instans primer dan sekunder (penyimpanan bersama) atau arsitektur virtual warehouse. Hal ini memastikan bahwa ingesti data Flink dari log biner tidak mengganggu layanan online. |
Catatan penggunaan
-
Solusi gudang data waktu nyata ini hanya didukung pada instans Hologres dedicated.
-
Ruang kerja Realtime Compute for Apache Flink, instans ApsaraDB RDS for MySQL, dan instans Hologres Anda harus berada dalam VPC yang sama. Jika berada di VPC berbeda, Anda harus menghubungkannya terlebih dahulu atau menggunakan titik akhir publik. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses layanan lain lintas VPC? dan Bagaimana cara mengakses Internet?.
-
Jika Anda menggunakan RAM user atau RAM role untuk mengakses resource Realtime Compute for Apache Flink, Hologres, dan ApsaraDB RDS for MySQL, pastikan memiliki izin yang diperlukan.
Langkah 1: Siapkan lingkungan
Buat instans RDS for MySQL dan siapkan data
-
Buat instans ApsaraDB RDS for MySQL. Untuk informasi selengkapnya, lihat Buat instans ApsaraDB RDS for MySQL.
Instans ApsaraDB RDS for MySQL harus berada dalam VPC yang sama dengan ruang kerja Flink dan instans Hologres Anda.
-
Buat database dan akun.
Untuk instans target, buat database bernama
order_dwdan akun standar dengan izin baca dan tulis pada database tersebut. Untuk informasi selengkapnya, lihat Buat database dan Buat akun. -
Siapkan sumber data CDC MySQL.
-
Pada halaman detail instans, klik Log On to Database.
-
Pada halaman login, masukkan username dan password untuk akun database yang telah Anda buat, lalu klik Log On.
-
Setelah login, klik dua kali database
order_dwuntuk beralih ke database tersebut. -
Pada SQL Console, masukkan pernyataan DDL berikut untuk membuat tabel bisnis dan pernyataan INSERT untuk mengisinya.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Prepare data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
-
-
Klik Execute, lalu klik Direct Execution.
Buat instans Hologres dan kelompok komputasi
-
Beli instans Hologres dedicated. Untuk informasi selengkapnya, lihat Beli instans Hologres.
Instans Hologres harus berada dalam VPC yang sama dengan instans ApsaraDB RDS for MySQL. Untuk mengalami isolasi sumber daya yang kuat melalui pemisahan baca/tulis, contoh ini menggunakan tipe instans Virtual Warehouse, dan mengatur Reserved Compute Resource menjadi 64 sehingga Anda dapat membuat kelompok komputasi tambahan.
-
Setelah Anda login ke instans, buat database dan berikan izin.
Buat database bernama order_dw (dengan model izin sederhana diaktifkan), dan berikan hak istimewa administratif kepada pengguna. Untuk detail tentang manajemen dan otorisasi database, lihat Kelola database.
Catatan-
Jika Anda tidak menemukan akun dalam daftar drop-down User, artinya akun tersebut belum ditambahkan ke instans. Buka halaman User Management dan tambahkan pengguna sebagai SuperUser.
-
Pada Hologres V2.0 dan versi lebih baru, ekstensi binary logging diaktifkan secara default. Anda tidak perlu mengaktifkannya secara manual.
-
-
Buat kelompok komputasi baru.
Anda dapat menggunakan kelompok komputasi berbeda untuk mengisolasi sumber daya. Gunakan kelompok komputasi awal
init_warehouseuntuk penulisan data dan kelompok komputasiread_warehouse_1untuk melayani kueri.Semua sumber daya komputasi yang dipesan dialokasikan ke kelompok komputasi awal
init_warehousesecara default. Anda harus mengurangi sumber dayanya terlebih dahulu sebelum dapat membuat kelompok komputasi baru. Untuk informasi selengkapnya, lihat Buat instans kelompok komputasi baru.-
Buka dan konfirmasi nama instans.
-
Pada baris untuk kelompok komputasi
init_warehouse, klik Modify Configuration di kolom Actions. Kurangi sumber daya yang dialokasikan dan klik OK. -
Klik Create Compute Group, buat kelompok komputasi baru bernama
read_warehouse_1, lalu klik OK.
-
Buat ruang kerja Flink dan katalog
-
Buat ruang kerja Flink. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
Ruang kerja Flink harus berada dalam VPC yang sama dengan instans ApsaraDB RDS for MySQL dan Hologres.
-
Login ke Konsol Realtime Compute for Apache Flink dan klik Console di kolom Actions ruang kerja Anda.
-
Buat kluster session untuk menyediakan lingkungan eksekusi guna membuat katalog dan menjalankan skrip. Untuk informasi selengkapnya, lihat Langkah 1: Buat kluster session.
-
Buat katalog Hologres.
Pada halaman , di tab Scripts, salin kode berikut, ganti nilai placeholder, pilih kode tersebut, lalu klik Run. Ini menggunakan kluster session yang telah Anda buat sebagai lingkungan eksekusi.
CREATE CATALOG dw WITH ( 'type' = 'hologres', 'endpoint' = '<ENDPOINT>', 'username' = 'BASIC$flinktest', 'password' = '${secret_values.holosecrect}', 'dbname' = 'order_dw@init_warehouse', -- Tentukan nama database dan hubungkan ke kelompok komputasi init_warehouse. 'binlog' = 'true', -- Anda dapat mengatur opsi WITH default untuk tabel sumber, dimensi, dan hasil saat membuat katalog. Tabel yang dibuat di bawah katalog ini akan mewarisi pengaturan default ini. 'sdkMode' = 'jdbc', -- Mode jdbc direkomendasikan. 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'ignoredelete' = 'true', -- Diperlukan untuk merge tabel lebar guna mencegah retraksi. 'partial-insert.enabled' = 'true', -- Diperlukan untuk merge tabel lebar guna mengaktifkan pembaruan kolom parsial. 'mutateType' = 'insertOrUpdate', -- Diperlukan untuk merge tabel lebar guna mengaktifkan pembaruan kolom parsial. 'table_property.binlog.level' = 'replica', -- Anda juga dapat meneruskan properti tabel Hologres persisten saat membuat katalog. Tabel yang dibuat nanti akan memiliki binary logging yang diaktifkan secara default. 'table_property.binlog.ttl' = '259200' );Modifikasi parameter berikut dengan informasi layanan Hologres Anda yang sebenarnya.
Parameter
Deskripsi
Catatan
endpoint
Titik akhir instans Hologres Anda.
Pada halaman detail instans Hologres, peroleh nama domain untuk VPC yang ditentukan. Untuk informasi selengkapnya tentang nama domain, lihat Endpoints.
username
Pilih salah satu dari berikut:
-
Username untuk akun kustom harus diformat sebagai
BASIC$<user_name>. -
ID AccessKey Akun Alibaba Cloud atau RAM user Anda.
-
Pengguna yang dikonfigurasi harus memiliki akses ke database Hologres yang sesuai. Untuk detailnya, lihat Model izin Hologres dan Kelola pengguna.
-
Contoh ini menggunakan akun kustom bernama
BASIC$flinktestdan mengatur password-nya menggunakan variabel proyek bernama holosecrect untuk menghindari risiko keamanan penyimpanan password dalam teks biasa. Untuk informasi selengkapnya, lihat Variabel proyek.
password
-
Password akun kustom.
-
Rahasia AccessKey Akun Alibaba Cloud atau RAM user Anda.
CatatanSaat membuat katalog, Anda dapat mengatur opsi WITH default untuk tabel sumber, dimensi, dan hasil. Anda juga dapat mengatur properti default untuk tabel fisik Hologres, seperti parameter yang diawali dengan
table_property. Untuk informasi selengkapnya, lihat Kelola katalog Hologres dan Konektor Hologres untuk gudang data waktu nyata (parameter WITH). -
-
Buat katalog MySQL.
Salin kode berikut ke tab Scripts, modifikasi nilai parameternya, pilih kode tersebut, lalu klik Run. Ini menggunakan kluster session yang telah Anda buat sebagai lingkungan eksekusi.
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '${secret_values.mysql_pw}', 'default-database' = 'order_dw' );Modifikasi parameter berikut dengan informasi layanan MySQL Anda yang sebenarnya.
Parameter
Deskripsi
hostname
Alamat IP atau hostname database MySQL Anda. Pada halaman informasi dasar database, klik View Connection Details di area Network Type untuk memperoleh titik akhir internal.
port
Nomor port layanan database MySQL Anda. Nilai default-nya adalah 3306.
username
Username untuk layanan database MySQL Anda.
password
Password untuk layanan database MySQL Anda.
Contoh ini menggunakan variabel bernama
mysql_pwuntuk password guna menghindari eksposur teks biasa. Untuk informasi selengkapnya, lihat Kelola variabel.
Langkah 2: Bangun gudang data waktu nyata
Bangun lapisan ODS: Ingesti data bisnis
Dengan pernyataan CREATE DATABASE AS (CDAS) berbasis katalog, Anda dapat membuat lapisan ODS dalam satu langkah. Lapisan ODS biasanya berfungsi sebagai sumber event untuk pekerjaan streaming, bukan langsung untuk kueri OLAP atau kueri titik. Mengaktifkan binary logging sudah cukup untuk tujuan ini. Binary logging merupakan kemampuan inti Hologres. Konektor Hologres juga mendukung mode full-plus-incremental: pertama membaca snapshot lengkap, lalu mengonsumsi log biner secara inkremental.
-
Buat pekerjaan sinkronisasi CDAS ODS.
-
Pada halaman , buat draft stream SQL baru bernama ODS dan salin kode berikut ke editor SQL.
-- Parameter table_property.binlog.level telah diatur saat membuat katalog, sehingga semua tabel yang dibuat oleh CDAS memiliki binary logging yang diaktifkan. CREATE DATABASE IF NOT EXISTS dw.order_dw AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- Anda dapat memilih tabel upstream yang akan diingesti sesuai kebutuhan. /*+ OPTIONS('server-id'='8001-8004') */ ; -- Tentukan rentang server-id untuk instans mysql-cdc.Catatan-
Secara default, contoh ini menyinkronkan data ke skema Public dari database
order_dw. Anda juga dapat menyinkronkan data ke skema tertentu dalam database Hologres target. Untuk informasi selengkapnya, lihat Gunakan katalog Hologres sebagai tujuan dalam pernyataan CREATE DATABASE AS.... Setelah Anda menentukan skema, format nama tabel akan berubah saat menggunakan katalog. Untuk detailnya, lihat Gunakan katalog Hologres. -
Jika skema tabel sumber berubah, skema tabel hasil tidak akan diperbarui hingga terjadi perubahan data (hapus, sisip, atau perbarui) pada tabel sumber.
-
-
Di pojok kanan atas, klik Deploy untuk men-deploy pekerjaan.
-
Di panel navigasi kiri, pilih . Pada baris untuk pekerjaan ODS yang baru saja di-deploy, klik Start di kolom Actions. Pilih Initial Mode, lalu klik Start.
-
-
Muat data ke kelompok komputasi.
Kelompok tabel merupakan pembawa data di Hologres. Saat Anda menggunakan kelompok komputasi read_warehouse_1 untuk mengkueri data dari kelompok tabel dalam database order_dw, seperti order_dw_tg_default (untuk membuat kelompok tabel, lihat Manajemen Kelompok Tabel), kelompok tabel order_dw_tg_default dimuat untuk kelompok komputasi read_warehouse_1. Hal ini memungkinkan Anda menggunakan kelompok komputasi
init_warehouseuntuk menulis data dan menggunakan kelompok komputasiread_warehouse_1untuk kueri layanan.Pada halaman pengembangan HoloWeb, klik SQL Editor. Konfirmasi nama instans dan nama database, lalu eksekusi perintah berikut. Untuk informasi selengkapnya, lihat Buat instans kelompok komputasi baru. Setelah dimuat, Anda dapat melihat bahwa
read_warehouse_1telah memuat data dari kelompok tabelorder_dw_tg_default.-- Daftar kelompok tabel dalam database saat ini. SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name; -- Muat kelompok tabel ke kelompok komputasi. CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1); -- Periksa kelompok tabel yang dimuat ke kelompok komputasi. select * from hologres.hg_warehouse_table_groups; -
Di pojok kanan atas, alihkan kelompok komputasi ke
read_warehouse_1. Kueri dan analisis selanjutnya akan menggunakan kelompok komputasi ini.Di pojok kanan atas halaman HoloWeb, pilih
read_warehouse_1dari daftar drop-down kelompok komputasi. -
Pada halaman SQL Editor, eksekusi perintah berikut untuk melihat data yang disinkronkan dari MySQL ke tiga tabel Hologres.
-- Kueri data dari tabel orders. SELECT * FROM orders; -- Kueri data dari tabel orders_pay. SELECT * FROM orders_pay; -- Kueri data dari tabel product_catalog. SELECT * FROM product_catalog;Hasil kueri untuk tabel
product_catalogberisi dua kolom, product_id (1 hingga 5) dan catalog_name (phone_aaa, phone_bbb, phone_ccc, phone_ddd, phone_eee), dengan total 5 catatan. Hal ini menunjukkan bahwa data berhasil disinkronkan ke Hologres.
Bangun lapisan DWD: Buat tabel lebar waktu nyata
Langkah ini menggunakan kemampuan pembaruan kolom parsial konektor Hologres. Anda dapat menyatakan pembaruan parsial dengan DML INSERT. Pekerjaan ini mengkueri beberapa tabel dimensi menggunakan kueri titik berkinerja tinggi, yang diaktifkan oleh penyimpanan baris dan penyimpanan hybrid baris-kolom Hologres. Dengan isolasi sumber daya yang kuat, beban kerja tulis, baca, dan analitik tidak saling mengganggu.
-
Gunakan fitur katalog Flink untuk membuat tabel lebar lapisan DWD
dwd_ordersdi Hologres.Pada halaman , salin kode berikut ke tab Scripts, pilih kode tersebut, lalu klik Run.
-- Kolom tabel lebar harus nullable karena aliran berbeda menulis ke tabel hasil yang sama, dan kolom apa pun dapat bernilai null. CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- Anda dapat memodifikasi properti tabel fisik Hologres melalui katalog. ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' -- Ubah TTL log biner menjadi satu minggu. ); -
Konsumsi perubahan binary logging dari tabel lapisan ODS
ordersdanorders_paysecara waktu nyata.Pada halaman , buat draft stream SQL baru bernama DWD. Salin kode berikut ke editor SQL, lalu Deploy dan Start pekerjaan. Pekerjaan SQL ini melakukan join tabel
ordersdanproduct_catalogmenggunakan temporal join dan menulis hasilnya ke tabeldwd_orders, memperkaya data secara waktu nyata.BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END; -
Lihat data dalam tabel lebar
dwd_orders.Hubungkan ke instans Hologres pada halaman pengembangan HoloWeb, login ke database target, lalu eksekusi perintah berikut di SQL Editor.
SELECT * FROM dwd_orders;Setelah eksekusi berhasil, hasil kueri mengembalikan data dari tabel lebar dwd_orders, termasuk bidang seperti order_id, order_user_id, order_shop_id, order_product_id, order_product_catalog_name, order_fee, order_create_time, dan order_update_time.
Bangun lapisan DWS: Hitung metrik waktu nyata
-
Gunakan fitur katalog Flink untuk membuat tabel agregat lapisan DWS
dws_usersdandws_shopsdi Hologres.Pada halaman , salin kode berikut ke tab Scripts, pilih kode tersebut, lalu klik Run.
-- Tabel agregat dimensi pengguna. CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment 'Total jumlah pembayaran yang diselesaikan pada hari tersebut', primary key(user_id,ds) NOT ENFORCED ); -- Tabel agregat dimensi toko. CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment 'Total jumlah pembayaran yang diselesaikan pada hari tersebut', primary key(shop_id,ds) NOT ENFORCED ); -
Konsumsi tabel lebar lapisan DWD
dw.order_dw.dwd_orderssecara waktu nyata, lakukan agregasi di Flink, dan tulis hasil akhir ke tabel DWS di Hologres.Pada halaman , buat draft stream SQL baru bernama DWS. Salin kode berikut ke editor SQL, lalu Deploy dan Start pekerjaan.
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Data aliran pesanan dan pembayaran telah ditulis ke tabel lebar. GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Data aliran pesanan dan pembayaran telah ditulis ke tabel lebar. GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END; -
Lihat hasil agregasi di lapisan DWS. Hasilnya diperbarui secara waktu nyata seiring perubahan data upstream.
-
Lihat data di konsol Hologres sebelum perubahan.
tabel dws_users
SELECT * FROM dws_users;Setelah menjalankan kueri, hasilnya mengembalikan data dari tabel dws_users, yang mencakup tiga kolom: user_id, ds, dan paied_buy_fee_sum. Dalam hasil contoh, kolom user_id berisi nilai
user_001,user_002, danuser_003; kolom ds berisi nilai20230215; dan kolom paied_buy_fee_sum berisi nilai8000.08,5000.05, dan5000.05secara berurutan. Kolom user_id mengidentifikasi setiap pengguna secara unik.tabel dws_shops
SELECT * FROM dws_shops;Hasil kueri menunjukkan bahwa tabel
dws_shopsberisi tiga kolom: shop_id (ID toko), ds (partisi tanggal), dan paied_buy_fee_sum (jumlah pembayaran). Empat baris data sampel dikembalikan, mengonfirmasi bahwa tabel lapisan DWS telah berhasil dibangun. -
Di konsol RDS, sisipkan satu catatan baru ke masing-masing tabel
ordersdanorders_paydalam databaseorder_dw.INSERT INTO orders VALUES (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 19:40:56'); -
Lihat data yang diperbarui di konsol Hologres.
tabel dwd_orders
SELECT * FROM dwd_orders;Setelah menjalankan kueri, delapan catatan pesanan dikembalikan dari tabel
dwd_orders, termasuk bidang seperti order_id, order_user_id, order_shop_id, order_product_id, order_product_catalog_name, order_fee, order_create_time, dan order_update_time. Catatan kedelapan (order_id=100008,user_003,phone_eee,6000.02) merupakan data yang baru ditulis.tabel dws_users
SELECT * FROM dws_users;Kueri mengembalikan tiga baris dengan tiga kolom: user_id, ds, dan paied_buy_fee_sum:
user_001 / 20230215 / 8000.08,user_002 / 20230215 / 5000.05, danuser_003 / 20230215 / 11000.07. Jumlah pembayaran total tertinggi adalah untuk user_003 (11000.07).tabel dws_shops
SELECT * FROM dws_shops;Setelah kueri dijalankan, hasilnya berisi tiga kolom, shop_id, ds, dan paied_buy_fee_sum, dengan empat baris data yang menunjukkan nilai biaya (11000.07, 4000.04, 7000.07, dan 2000.02) untuk toko 12345, 12346, 12347, dan 12348 pada 20230215. Kolom shop_id dan paied_buy_fee_sum merupakan kolom metrik utama.
-
Profil data
Karena binary logging diaktifkan, Anda dapat langsung memeriksa perubahan data. Jika Anda perlu melakukan eksplorasi data bisnis ad-hoc pada hasil antara atau memverifikasi kebenaran komputasi akhir, setiap lapisan solusi ini dipertahankan, sehingga memudahkan pemeriksaan proses antara.
Profil mode streaming
Anda dapat menggunakan konektor Print untuk mengonfirmasi apakah pesan yang dioutput ke tabel hasil lain sesuai harapan.
-
Buat dan mulai pekerjaan profil data streaming.
Pada halaman , buat draft stream SQL baru bernama Data-exploration. Salin kode berikut ke editor SQL, lalu Deploy dan Start pekerjaan.
-- Profil mode streaming. Output Print menunjukkan perubahan data waktu nyata. CREATE TEMPORARY TABLE print_sink( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int, pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ -- Di sini, startTime adalah waktu pembuatan log biner. WHERE order_user_id = 'user_001'; -
Lihat hasil profil data.
Pada halaman detail , klik nama pekerjaan target. Pada tab Logs, klik tab Task Manager Logs, lalu klik tautan Path, ID di bawah Running Task Managers. Pada halaman Stdout, cari informasi log yang terkait dengan
user_001.Output log menampilkan catatan perubahan data CDC dengan awalan
+I(insert),-U(sebelum update), dan+U(setelah update), termasuk bidang sepertiorder_id,order_user_id,order_shop_id,order_fee, danorder_create_time.
Profil mode batch
Profil mode batch tidak menulis data ke tabel hasil. Sebaliknya, ia mengambil status akhir data pada saat ini, memungkinkan Anda melihat hasilnya langsung di output debug.
Pada halaman , buat draft stream SQL, salin kode berikut ke editor SQL, lalu klik Debug. Untuk informasi selengkapnya, lihat Debug pekerjaan.
Hasil debugging pada halaman pengembangan pekerjaan Flink adalah sebagai berikut.
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; -- Mode batch mendukung filter pushdown untuk meningkatkan efisiensi eksekusi pekerjaan batch.
Setelah debugging selesai, hasil kueri mengembalikan dua catatan pesanan yang memenuhi kondisi filter, termasuk bidang seperti order_id, order_user_id, order_shop_id, order_product_id, order_product_catalog_name, order_fee, order_create_time, dan order_update_time. Hal ini memverifikasi bahwa hasil profil mode batch sesuai harapan.
Langkah 3: Gunakan gudang data waktu nyata
Langkah 2 menunjukkan cara menggunakan katalog Flink untuk membangun gudang data waktu nyata berlapis berdasarkan Flink dan Hologres. Bagian berikut menjelaskan beberapa skenario aplikasi sederhana.
Kueri titik
Kueri tabel metrik agregat lapisan DWS berdasarkan kunci primer, dengan dukungan jutaan RPS.
Pada halaman pengembangan HoloWeb, jalankan SQL berikut untuk mengkueri jumlah konsumsi pengguna tertentu pada tanggal tertentu.
-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
Hasil kueri mengembalikan tiga kolom: user_id, ds, dan paied_buy_fee_sum (jumlah konsumsi). Jumlah konsumsi untuk user_001 pada 20230215 adalah 8000.08.
Kueri OLAP
Jalankan kueri OLAP pada tabel lebar lapisan DWD.
Pada halaman pengembangan HoloWeb, jalankan SQL berikut untuk mengkueri detail pesanan pelanggan tertentu pada platform pembayaran tertentu pada Februari 2023.
-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
Hasil kueri mengembalikan dua catatan pesanan dengan bidang seperti order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, dan order_update_time. Contoh ID pesanan adalah 100006 dan 100004.
Laporan waktu nyata
Buat laporan waktu nyata berdasarkan data dari tabel lebar lapisan DWD. Penyimpanan hybrid baris-kolom dan tabel berorientasi kolom di Hologres menyediakan kemampuan kueri OLAP yang sangat baik, mendukung tanggapan dalam hitungan detik.
Pada halaman pengembangan HoloWeb, jalankan SQL berikut untuk mengkueri jumlah total pesanan dan jumlah total pesanan untuk setiap kategori produk pada Februari 2023.
-- holo sql
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
Setelah menjalankan pernyataan SQL, tab Result menampilkan tabel dengan empat kolom: order_create_date, order_product_catalog_name, count, dan sum. Misalnya, hasil untuk tanggal 20230215 menunjukkan bahwa kategori phone_aaa memiliki 2 pesanan dengan total 6000.06, kategori phone_bbb memiliki 1 pesanan senilai 4000.04, dan seterusnya untuk kelima kategori tersebut.
Referensi
-
Tutorial untuk skenario terkait:
-
Untuk informasi selengkapnya tentang kemampuan binary logging Hologres, lihat Berlangganan Binlog Hologres.
-
Flink mendukung beberapa pernyataan INSERT INTO dalam satu pekerjaan. Untuk detail sintaksis, lihat Pernyataan INSERT INTO.
-
Realtime Compute for Apache Flink mendukung berbagai konektor. Untuk informasi selengkapnya, lihat Konektor yang didukung.