Anda dapat membangun gudang data real-time untuk mengimplementasikan pemrosesan dan analisis data yang efisien dan skalabel berdasarkan kemampuan pemrosesan data real-time dari Realtime Compute for Apache Flink serta fitur Hologres seperti pencatatan biner, penyimpanan hibrid baris-kolom, dan isolasi sumber daya yang kuat. Ini membantu Anda menghadapi peningkatan volume data dan kebutuhan bisnis real-time. Topik ini menjelaskan cara membangun gudang data real-time menggunakan Realtime Compute for Apache Flink dan Hologres.
Informasi latar belakang
Seiring dengan kemajuan digitalisasi, perusahaan memiliki permintaan yang semakin tinggi terhadap kesegaran data. Pengguna perlu memproses, menyimpan, dan menganalisis data secara real-time di banyak skenario bisnis, selain skenario pemrosesan data offline tradisional. Untuk gudang data offline tradisional, penjadwalan periodik dilakukan untuk memproses data pada lapisan-lapisan berikut: operational data store (ODS), data warehouse detail (DWD), data warehouse service (DWS), dan application data service (ADS). Namun, sistem metodologi untuk gudang data real-time belum jelas. Konsep Streaming Warehouse digunakan untuk mengimplementasikan aliran data real-time yang efisien antara lapisan-lapisan data. Ini dapat menyelesaikan masalah pengelompokan data dalam gudang data real-time.
Ikhtisar
Contoh ini menunjukkan cara membangun gudang data real-time untuk platform e-commerce menggunakan Flink dan Hologres. Gudang data real-time dibangun untuk memproses dan membersihkan data secara real-time serta menanyakan data dari aplikasi eksternal. Dengan cara ini, data real-time dilapis dan digunakan kembali untuk mendukung berbagai skenario bisnis, seperti laporan data, dasbor transaksi, analitik aliran klik, profil pengguna, penandaan, serta rekomendasi personalisasi.
Arsitektur
Lapisan ODS: Flink mengambil data secara real-time ke dalam gudang data.
MySQL memiliki tiga tabel bisnis: orders, orders_pay, dan product_catalog. Realtime Compute for Apache Flink mengambil data dari tabel-tabel tersebut ke Hologres secara real-time, membentuk lapisan ODS.
Lapisan DWD: Flink mengintegrasikan data di tabel terkait secara real-time.
Flink menggabungkan tabel orders, orders_pay, dan product_catalog secara real-time, membentuk tabel dwd_orders di lapisan DWD.
Lapisan DWS: Flink melakukan komputasi real-time.
Flink mengonsumsi peristiwa log biner dari tabel dwd_orders dan menghasilkan tabel agregat dws_users dan dws_shops di lapisan DWS.
Hologres merespons pertanyaan.
Hologres menangani pertanyaan pada tabel di lapisan DWS, mendukung hingga satu juta rekaman per detik (RPS).
Hologres melakukan analisis OLAP atau menghasilkan laporan real-time berdasarkan tabel dwd_orders, memberikan respons pertanyaan dalam hitungan detik.
Manfaat solusi dan kemampuan layanan
Solusi ini memberikan manfaat berikut:
Pembaruan Efisien dan Pertanyaan Ad-Hoc: Gudang real-time tradisional kesulitan dengan menanyakan, memperbarui, dan memperbaiki data di lapisan tengah. Hologres mengatasi masalah ini dengan mendukung pembaruan dan koreksi data yang efisien serta memastikan konsistensi setelah tulis di setiap lapisan.
Kegunaan Ulang Data: Hologres memungkinkan data di setiap lapisan digunakan secara independen untuk layanan eksternal, memungkinkan penggunaan ulang data yang efisien dalam gudang data.
Mempermudah Arsitektur dan Meningkatkan Efisiensi: Flink SQL digunakan untuk membangun pipeline ETL real-time, dengan data di lapisan ODS, DWD, dan DWS disimpan secara terpusat di Hologres. Pendekatan ini menyederhanakan arsitektur gudang data dan meningkatkan efisiensi pemrosesan data.
Solusi ini bergantung pada tiga kemampuan inti Hologres:
Kemampuan | Deskripsi |
Hologres memiliki fitur pencatatan biner yang mendorong Flink untuk melakukan komputasi real-time dan berfungsi sebagai komponen upstream dalam pipeline streaming. | |
Hologres mendukung penyimpanan hibrid baris-kolom. Tabel Hologres menyimpan data berorientasi baris dan kolom dan memastikan konsistensi kuat di antara keduanya. Fitur ini memastikan bahwa tabel di lapisan tengah dapat digunakan sebagai tabel sumber untuk Flink dan sebagai tabel dimensi Flink untuk join lookup dan pertanyaan titik berdasarkan kunci utama. Tabel di lapisan tengah juga dapat ditanyai oleh aplikasi lain, seperti aplikasi OLAP dan layanan online. | |
Isolasi sumber daya yang kuat | Jika beban pada instance Hologres tinggi, kinerja lapisan tengah untuk merespons pertanyaan titik mungkin terpengaruh. Hologres mengimplementasikan isolasi sumber daya yang kuat dengan mengonfigurasi pembagian baca/tulis untuk instance utama dan sekunder (penyimpanan bersama) atau menggunakan arsitektur gudang virtual. Ini memastikan bahwa layanan online tidak terpengaruh ketika Flink menarik data log biner dari Hologres. |
Catatan penggunaan
Anda memiliki instance eksklusif Hologres.
Instance ApsaraDB RDS for MySQL dan instance Hologres harus berada di virtual private cloud (VPC) yang sama dengan ruang kerja Realtime Compute for Apache Flink Anda. Jika tidak, Anda harus membuat koneksi antara VPC atau mengaktifkan Realtime Compute for Apache Flink untuk mengakses layanan lain melalui Internet. Untuk informasi lebih lanjut, lihat Bagaimana Realtime Compute for Apache Flink Mengakses Layanan Lintas VPC? dan Bagaimana Realtime Compute for Apache Flink Mengakses Internet?.
Pengguna RAM atau peran RAM yang digunakan untuk mengakses Flink, Hologres, dan MySQL memiliki izin yang diperlukan.
Langkah 1: Persiapan
Buat instance ApsaraDB RDS for MySQL dan siapkan data
Buat Instance ApsaraDB RDS for MySQL.
Instance ApsaraDB RDS for MySQL Anda, ruang kerja Flink, dan instance Hologres harus berada di VPC yang sama.
Buat Database bernama order_dw dan Akun Standar.
Siapkan Sumber Data CDC MySQL.
Di pojok kanan atas halaman detail instance MySQL, klik Log On to Database.
Di dialog, masukkan akun dan kata sandi Anda dan klik Login.
Klik dua kali database order_dw di panel navigasi sisi kiri.
Di tab Konsol SQL, tulis pernyataan DDL untuk membuat tiga tabel bisnis dan sisipkan data ke dalam tabel.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Siapkan data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');Klik Execute(F8). Di panel yang muncul, klik Execute.
Buat instance Hologres dan gudang virtual
Buat Instance Eksklusif Hologres.
Untuk Product Type, pilih Exclusive instance (subscription) atau Exclusive instance (pay-as-you-go) sesuai kebutuhan.
Untuk Specifications, pilih Compute Group Type.
Untuk Reserved Computing Resources of Virtual Warehouse, masukkan 64 untuk mendukung penambahan gudang virtual tambahan.
Untuk VPC, pilih VPC instance MySQL Anda dari daftar drop-down.
Masuk ke instance Hologres Anda dan buat database.
Di panel navigasi sisi kiri konsol Hologres, pilih Instances.
Klik nama instance Hologres target.
Di halaman detail instance, klik Connect to Instance.
Di bilah navigasi atas konsol HoloWeb yang muncul, pilih Create Database.
Untuk Database Name, masukkan order_dw.
Untuk Policy, pilih SPM.
Klik OK.
Tetapkan peran admin ke akun Alibaba Cloud Anda, pengguna RAM, atau peran. Untuk informasi lebih lanjut, lihat Kelola Database.
CatatanJika Anda tidak menemukan pengguna RAM atau peran Anda di daftar drop-down User, tambahkan pengguna RAM sebagai superuser instance di halaman .
Ekspansi log biner diaktifkan secara default di Hologres V2.0 atau lebih baru.
Buat gudang virtual bernama read_warehouse_1 untuk menangani pertanyaan.
Gudang virtual awal init_warehouse digunakan untuk penulisan data.
Karena sumber daya komputasi cadangan secara otomatis dialokasikan ke gudang virtual awal, Anda harus mengurangi sumber daya komputasinya sebelum membuat instance gudang virtual. Lakukan langkah-langkah berikut:
Di bilah navigasi atas konsol HoloWeb, klik . Konfirmasikan nama instance sudah benar.
Temukan gudang virtual awal init_warehouse dan klik Modify Configuration di kolom Actions.
Di dialog, kurangi sumber daya grup komputasi, dan klik OK.
Klik Create Compute Group. Masukkan read_warehouse_1 di bidang Compute Group Name dan klik OK.
Buat ruang kerja Realtime Compute for Apache Flink dan katalog
Luncurkan ruang kerja Flink Anda di VPC yang sama dengan instance ApsaraDB RDS for MySQL dan instance Hologres Anda.
Masuk ke Konsol Realtime Compute for Apache Flink. Temukan ruang kerja target dan klik Console di kolom Actions.
Buat Klaster Sesi, yang menyediakan lingkungan eksekusi untuk membuat katalog dan skrip.
Buat katalog Hologres melalui Flink SQL.
Di panel navigasi sisi kiri konsol pengembangan Realtime Compute for Apache Flink, pilih .
Klik New untuk membuat skrip baru, salin kode berikut ke editor, dan ganti nilai placeholder dengan nilai aktual Anda.
Kemudian, pilih kode yang ingin Anda jalankan, dan klik Run.
Di pojok kanan bawah halaman, Anda dapat melihat lingkungan eksekusi adalah klaster sesi yang Anda buat.
CREATE CATALOG dw WITH (
'type' = 'hologres',
'endpoint' = '<ENDPOINT>',
'username' = 'BASIC$flinktest',
'password' = '${secret_values.holosecrect}',
'dbname' = 'order_dw@init_warehouse', --Hubungkan ke gudang virtual init_warehouse.
'binlog' = 'true', -- Opsi konektor yang Anda tentukan saat membuat katalog Hologres juga berlaku untuk tabel baru yang dibuat di katalog ini.
'sdkMode' = 'jdbc', -- Mode yang direkomendasikan.
'cdcmode' = 'true',
'connectionpoolname' = 'the_conn_pool',
'ignoredelete' = 'true', -- Aktifkan opsi ini untuk mencegah retraksi data saat mengkonsolidasikan tabel.
'partial-insert.enabled' = 'true', -- Aktifkan pembaruan parsial saat konsolidasi tabel terlibat.
'mutateType' = 'insertOrUpdate', -- Aktifkan pembaruan parsial saat konsolidasi tabel terlibat.
'table_property.binlog.level' = 'replica', -- Kirim properti tabel persisten, pencatatan biner diaktifkan dalam kasus ini, saat membuat katalog. Properti ini berlaku untuk semua tabel yang dibuat kemudian di katalog ini.
'table_property.binlog.ttl' = '259200'
);Ganti nilai placeholder berikut:
Opsi | Deskripsi | Catatan |
endpoint | Endpoint instance Hologres Anda. | Untuk mendapatkan nilai endpoint, ikuti langkah-langkah berikut:
Untuk informasi lebih lanjut, lihat Endpoint untuk menghubungkan ke Hologres. |
username | Masukkan salah satu dari berikut:
|
|
password |
|
Saat membuat katalog, Anda dapat menentukan opsi konektor, yang secara otomatis berlaku untuk tabel yang dibuat dalam katalog tersebut nanti. Selain itu, Anda dapat menentukan properti default, seperti yang memiliki awalan table_property, untuk tabel fisik Hologres. Untuk informasi lebih lanjut, lihat Kelola Katalog Hologres dan bagian "Opsi konektor dalam klausa WITH" dari topik Konektor Hologres.
Buat katalog MySQL.
Salin kode berikut di editor SQL, ganti nilai placeholder, pilih kode yang ingin Anda jalankan, lalu klik Run. Di pojok kanan bawah halaman, Anda dapat melihat lingkungan eksekusi adalah klaster sesi yang Anda buat.
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '${secret_values.mysql_pw}', 'default-database' = 'order_dw' );Ganti nilai placeholder berikut dengan nilai aktual Anda:
Opsi
Deskripsi
hostname
Alamat IP atau nama host yang digunakan untuk mengakses database ApsaraDB RDS for MySQL Anda. Untuk mendapatkan endpoint internal, ikuti langkah-langkah berikut:
Di panel navigasi sisi kiri konsol ApsaraDB RDS, pilih Instance.
Klik nama instance target.
Di bagian Basic Information halaman yang muncul, klik View Details di sebelah bidang Network Type.
Di panel yang muncul, salin nilai endpoint internal.
port
Nomor port database ApsaraDB RDS for MySQL Anda. Nilai default: 3306.
username
Nama pengguna yang digunakan untuk mengakses database ApsaraDB RDS for MySQL Anda.
password
Kata sandi yang digunakan untuk mengakses database ApsaraDB RDS for MySQL Anda.
Untuk keamanan yang lebih baik, contoh ini menggunakan variabel mysql_pw sebagai pengganti rahasia AccessKey teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.
Langkah 2: Membangun gudang data real-time
Membangun lapisan ODS: Ingest data ke Hologres secara real-time dengan Flink
Anda dapat menjalankan CREATE DATABASE AS (CDAS) yang terkait dengan katalog untuk dengan mudah membuat lapisan ODS. Lapisan ODS sering kali tidak digunakan untuk OLAP atau pertanyaan titik. Sebagai gantinya, ia digunakan sebagai driver peristiwa untuk pekerjaan streaming. Oleh karena itu, mengaktifkan pencatatan biner untuk lapisan ini sudah cukup. Pencatatan biner adalah salah satu kemampuan inti Hologres. Konektor Hologres dapat digunakan untuk membaca semua rekaman dan kemudian mengonsumsi data log biner tambahan.
Buat pekerjaan sinkronisasi.
Di panel navigasi sisi kiri, pilih . Di halaman yang muncul, buat draft aliran SQL bernama ODS dan salin kode berikut ke editor SQL:
CREATE DATABASE IF NOT EXISTS dw.order_dw -- Karena opsi table_property.binlog.level ditentukan saat pembuatan katalog Hologres, pencatatan biner diaktifkan untuk semua tabel yang dibuat menggunakan pernyataan CDAS. AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- Anda dapat memilih tabel di database upstream yang perlu diambil ke dalam gudang data. /*+ OPTIONS('server-id'='8001-8004') */ ; -- Tentukan rentang nilai parameter server-id untuk instance CDC MySQL.CatatanDalam contoh ini, data disinkronkan ke skema publik database order_dw secara default. Anda juga dapat menyinkronkan data ke skema tertentu di database Hologres tujuan. Untuk informasi lebih lanjut, lihat bagian "Gunakan katalog Hologres yang Anda buat sebagai katalog penyimpanan tujuan yang digunakan dalam pernyataan CREATE TABLE AS" dari topik Kelola Katalog Hologres. Setelah Anda menentukan skema, format nama tabel berubah saat Anda menggunakan katalog. Untuk informasi lebih lanjut, lihat bagian "Gunakan katalog Hologres" dari topik Kelola Katalog Hologres.
Pembaruan skema tabel sumber tidak tercermin di tabel tujuan hingga data di tabel sumber dihapus, dimasukkan, atau diperbarui.
Di pojok kanan atas editor SQL, klik Deploy.
Mulai pekerjaan.
Di panel navigasi sisi kiri, pilih .
Di halaman Deployments, temukan penyebaran bernama
ODSdan klik Start di kolom Actions.Di panel Start Job, pilih opsi Initial Mode, lalu klik tombol Start.
Muat data ke gudang virtual.
Buat dan muat
order_dw_tg_defaultgrup tabel, yang menyimpan data di databaseorder_dw, ke gudang virtualread_warehouse_1. Dengan cara ini, gudang virtualread_warehouse_1menangani pertanyaan eksternal sementara gudang virtualinit_warehousedigunakan untuk penulisan data.Di bilah navigasi atas halaman HoloWeb, pilih SQL Editor. Setelah memastikan Anda menggunakan instance dan database yang benar, jalankan perintah berikut. Untuk informasi lebih lanjut, lihat Buat Instance Gudang Virtual. Setelah berhasil dieksekusi, Anda dapat melihat grup tabel
order_dw_tg_defaulttelah dimuat ke gudang virtual read_warehouse_1.-- Tampilkan grup tabel di database saat ini. SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name; -- Muat grup tabel ke gudang virtual. CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1); -- Lihat apakah grup tabel telah dimuat. select * from hologres.hg_warehouse_table_groups;Di pojok kanan atas, ubah gudang virtual menjadi read_warehouse_1 untuk pertanyaan dan analisis.

Jalankan perintah berikut di Editor SQL untuk melihat hasil sinkronisasi.
--- Query data di tabel orders. SELECT * FROM orders; --- Query data di tabel orders_pay. SELECT * FROM orders_pay; --- Query data di tabel product_catalog. SELECT * FROM product_catalog;
Membangun lapisan DWD: Konsolidasi tabel
Kemampuan pembaruan kolom spesifik yang didukung oleh konektor Hologres digunakan untuk membangun lapisan DWD. Anda dapat menggunakan pernyataan INSERT untuk melakukan pembaruan parsial yang efisien. Pertanyaan titik berperforma tinggi berdasarkan penyimpanan data berorientasi kolom dan penyimpanan data hibrid baris-kolom Hologres membantu Anda menanyakan data dari tabel dimensi yang berbeda. Hologres menggunakan arsitektur isolasi sumber daya yang kuat, yang mencegah gangguan antara beban kerja tulis, baca, dan analitik.
Buat tabel bernama dwd_orders di lapisan DWD di Hologres dengan menggunakan katalog Hologres Flink.
Di panel navigasi sisi kiri konsol pengembangan Realtime Compute for Apache Flink, pilih . Di editor SQL, salin kode berikut, pilih kode tersebut, lalu klik Run.
-- Saat data dari sumber yang berbeda ditulis ke tabel sink tunggal, nilai null mungkin muncul di kolom mana pun dari tabel. Oleh karena itu, pastikan bahwa bidang di tabel lebar bersifat nullable. CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: telepon, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- Anda dapat memodifikasi properti tabel fisik Hologres melalui katalog Hologres. ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' -- Ubah periode timeout data log biner menjadi satu minggu. );Konsumsi data log biner tabel orders dan orders_pay di lapisan ODS secara real-time.
Di panel navigasi sisi kiri, pilih .
Di halaman yang muncul, buat draft aliran SQL bernama DWD dan salin kode berikut ke editor SQL.
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;Klik Deploy untuk membuat penyebaran dari draft.
Pergi ke , dan klik Start di kolom Actions penyebaran.
Pernyataan SQL di atas menggabungkan tabel
ordersdengan tabelproduct_catalogdan menulis hasil akhir kedwd_orders. Dengan cara ini, data dikonsolidasikan dan ditulis ke tabeldwd_orderssecara real-time.Lihat data tabel dwd_orders.
Di konsol HoloWeb, sambungkan ke instance Hologres dan masuk ke database tujuan. Kemudian, jalankan pernyataan berikut di editor SQL:
SELECT * FROM dwd_orders;
Membangun lapisan DWS: Lakukan komputasi real-time
Buat tabel agregat bernama dws_users dan dws_shops di Hologres dengan menggunakan katalog Hologres Flink.
Di panel navigasi sisi kiri, pilih .
Di editor SQL, salin kode berikut, pilih kode tersebut, lalu klik Run.
-- Buat tabel agregat dimensi pengguna. CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment 'Total jumlah pembayaran yang selesai pada hari itu', primary key(user_id,ds) NOT ENFORCED ); -- Buat tabel agregat dimensi vendor. CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment 'Total jumlah pembayaran yang selesai pada hari itu', primary key(shop_id,ds) NOT ENFORCED );
Konsumsi data di tabel
dw.order_dw.dwd_ordersdi lapisan DWD secara real-time, lakukan agregasi dengan Flink, lalu tulis hasilnya ke tabel di lapisan DWS di Hologres.Di panel navigasi sisi kiri, pilih .
Di halaman yang muncul, buat draft aliran SQL bernama DWS dan salin kode berikut ke editor SQL.
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Data streaming pesanan dan data streaming pembayaran telah ditulis ke tabel lebar. GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- Data streaming pesanan dan data streaming pembayaran telah ditulis ke tabel lebar. GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END;Klik Deploy untuk membuat penyebaran dari draft.
Pergi ke , dan klik Start di kolom Actions penyebaran.
Lihat hasil agregasi di lapisan DWS. Hasilnya diperbarui secara real-time berdasarkan perubahan data input.
Lihat hasil di konsol Hologres.
Tabel dws_users
SELECT * FROM dws_users;
Tabel dws_shops
SELECT * FROM dws_shops;
Di konsol ApsaraDB RDS, sisipkan catatan data ke tabel
ordersdanorders_paydi databaseorder_dw.INSERT INTO orders VALUES (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 19:40:56');Di konsol Hologres, lihat data yang diperbarui.
Tabel dwd_orders
SELECT * FROM dwd_orders;
Tabel dws_users
SELECT * FROM dws_users;
Tabel dws_shops
SELECT * FROM dws_shops;
Lakukan profiling data
Karena pencatatan biner diaktifkan, profiling data dapat dilakukan untuk membantu Anda langsung melihat perubahan data. Selain itu, data di setiap lapisan dipertahankan, sehingga Anda dapat melakukan pertanyaan ad-hoc pada hasil perantara atau memeriksa kebenaran hasil perhitungan akhir.
Mode streaming
Konektor Print dapat digunakan untuk memeriksa apakah pesan yang dikirim ke tabel sink lain memenuhi harapan.
Buat dan mulai pekerjaan streaming untuk profiling data.
Pergi ke konsol pengembangan Realtime Compute for Apache Flink.
Di panel navigasi sisi kiri, pilih .
Klik New untuk membuat draft aliran SQL bernama
Data-explorationdan salin kode berikut ke editor SQL.-- Lakukan profiling data dalam mode streaming. Anda dapat membuat tabel cetak untuk melihat perubahan data. CREATE TEMPORARY TABLE print_sink( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int, pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ -- Parameter startTime menentukan waktu ketika data log biner dihasilkan. WHERE order_user_id = 'user_001';Klik Deploy untuk membuat penyebaran dari draft.
Pergi ke , dan klik Start di kolom Actions penyebaran.
Lihat hasil profiling data.
Di panel navigasi sisi kiri konsol pengembangan, pilih .
Di halaman Deployments, klik nama penyebaran target.
Di tab Logs, klik subtab kiri Logs.
Pilih tab Running Task Managers dan klik nilai di kolom Path, ID.
Di tab Stdout, cari log yang terkait dengan
user_001.
Mode batch
Dalam mode batch, data tidak ditulis ke tabel sink apa pun. Sebagai gantinya, data akhir pada saat ini diambil dan tersedia untuk dilihat. Lakukan langkah-langkah berikut:
Di panel navigasi sisi kiri, pilih .
Di halaman yang muncul, klik New untuk membuat draft aliran SQL.
Salin kode berikut ke editor SQL, lalu klik Debug. Untuk informasi lebih lanjut, lihat Debug Penyebaran.
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; -- Profiling data dalam mode batch mendukung dorongan filter untuk meningkatkan efisiensi eksekusi pekerjaan batch.Gambar berikut menunjukkan hasil profiling data:

Langkah 3: Gunakan gudang data real-time
Bagian sebelumnya menjelaskan cara membangun gudang streaming bertingkat dengan Realtime Compute for Apache Flink dan Hologres. Bagian ini memperkenalkan beberapa kasus penggunaan umum dari gudang streaming.
Kueri berbasis kunci
Tanyakan tabel agregat di lapisan DWS berdasarkan kunci utama. Hingga satu juta RPS didukung.
Kode sampel berikut menunjukkan cara menanyakan jumlah pembayaran pengguna tertentu pada tanggal tertentu di konsol HoloWeb.
-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
Kueri rincian pesanan
Lakukan analisis OLAP pada tabel konsolidasi di lapisan DWD.
Kode sampel berikut memberikan contoh cara menanyakan detail pesanan pelanggan pada platform pembayaran tertentu pada Februari 2023 di konsol HoloWeb.
-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
Laporan real-time
Tampilkan laporan real-time berdasarkan data tabel konsolidasi di lapisan DWD. Penyimpanan data hibrid baris-kolom dan penyimpanan data berorientasi kolom Hologres menyediakan kemampuan analitik OLAP tinggi. Pertanyaan data dapat direspons dalam hitungan detik.
Kode sampel berikut memberikan contoh cara menanyakan total volume pesanan dan total jumlah pesanan setiap kategori pada Februari 2023 di konsol HoloWeb.
-- holo sql
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
Referensi
Praktik:
Untuk informasi lebih lanjut tentang pencatatan biner Hologres, lihat Berlangganan Log Biner Hologres.
Beberapa pernyataan INSERT INTO dapat ditulis dalam satu penyebaran Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang sintaks pernyataan INSERT INTO, lihat INSERT INTO.
Realtime Compute for Apache Flink mendukung berbagai konektor. Untuk informasi lebih lanjut, lihat Konektor yang Didukung.