Topik ini menjelaskan cara membangun danau data terpadu aliran (streaming data lakehouse) menggunakan Realtime Compute for Apache Flink, Paimon, dan StarRocks.
Informasi latar belakang
Seiring meningkatnya digitalisasi bisnis, permintaan terhadap data yang tepat waktu juga bertambah. Gudang data offline tradisional mengikuti metodologi yang jelas, yaitu menggunakan pekerjaan offline terjadwal untuk menggabungkan perubahan dari periode sebelumnya ke dalam hierarki gudang data yang mencakup lapisan Operational Data Store (ODS), Data Warehouse Detail (DWD), Data Warehouse Summary (DWS), dan Application Data Store (ADS). Namun, pendekatan ini memiliki dua masalah utama: latensi tinggi dan biaya tinggi. Pekerjaan offline biasanya hanya dijalankan sekali per jam atau bahkan sekali per hari, sehingga konsumen data hanya dapat melihat data dari jam atau hari sebelumnya. Selain itu, pembaruan data sering kali menimpa seluruh partisi, yang tidak efisien karena memerlukan pembacaan ulang seluruh data asli dalam partisi untuk digabungkan dengan perubahan baru.
Anda dapat membangun danau data terpadu aliran menggunakan Realtime Compute for Apache Flink dan Paimon untuk mengatasi keterbatasan gudang data offline tradisional. Kemampuan komputasi real-time Flink memungkinkan data mengalir antar lapisan gudang data secara real time, sedangkan kemampuan pembaruan efisien Paimon memungkinkan perubahan data dikirimkan ke konsumen hilir dengan latensi tingkat menit. Dengan demikian, danau data terpadu aliran unggul dalam hal latensi maupun biaya.
Untuk informasi lebih lanjut tentang fitur-fitur Paimon, lihat Fitur dan situs resmi Apache Paimon.
Arsitektur dan manfaat
Arsitektur
Realtime Compute for Apache Flink adalah mesin pemrosesan aliran andal yang mendukung pemrosesan efisien volume besar data real-time. Paimon adalah format penyimpanan danau terpadu untuk pemrosesan aliran dan batch yang mendukung pembaruan throughput tinggi dan kueri latensi rendah. Paimon terintegrasi erat dengan Flink untuk menyediakan solusi danau data terpadu aliran yang terpadu. Gambar berikut menunjukkan arsitektur danau data terpadu aliran yang dibangun dengan Flink dan Paimon:
Flink menulis data dari sumber data ke Paimon untuk membentuk lapisan ODS.
Flink berlangganan changelog lapisan ODS, memproses data, lalu menuliskannya kembali ke Paimon untuk membentuk lapisan DWD.
Flink berlangganan changelog lapisan DWD, memproses data, lalu menuliskannya kembali ke Paimon untuk membentuk lapisan DWS.
Akhirnya, StarRocks di E-MapReduce membaca tabel eksternal Paimon untuk kueri aplikasi.

Manfaat
Solusi ini memberikan manfaat sebagai berikut:
Setiap lapisan Paimon dapat mengirimkan perubahan ke konsumen hilir dengan latensi tingkat menit, sehingga mengurangi latensi gudang data offline tradisional dari hitungan jam atau bahkan hari menjadi hitungan menit.
Setiap lapisan Paimon dapat langsung menerima data perubahan tanpa menimpa partisi, yang secara signifikan mengurangi biaya pembaruan dan koreksi data pada gudang data offline tradisional serta mengatasi tantangan dalam mengkueri, memperbarui, dan mengoreksi data pada lapisan antara.
Modelnya terpadu dan arsitekturnya disederhanakan. Logika pipeline ekstrak, transformasi, dan muat (ETL) diimplementasikan menggunakan Flink SQL, sedangkan data pada lapisan ODS, DWD, dan DWS disimpan secara seragam dalam format Paimon. Hal ini mengurangi kompleksitas arsitektur dan meningkatkan efisiensi pemrosesan data.
Solusi ini bergantung pada tiga kemampuan inti Paimon. Tabel berikut memberikan rincian lebih lanjut.
Kemampuan inti Paimon | Rincian |
Pembaruan tabel primary key | Paimon menggunakan struktur data Log-Structured Merge-tree (LSM Tree) di lapisan dasar untuk mencapai pembaruan data yang efisien. Untuk informasi lebih lanjut tentang tabel primary key Paimon dan struktur data dasarnya, lihat Primary Key Table dan File Layouts. |
Mekanisme generasi data inkremental (Changelog Producer) | Paimon dapat menghasilkan changelog lengkap untuk setiap aliran data input. Setiap catatan update_after memiliki catatan update_before yang sesuai. Hal ini memastikan bahwa perubahan sepenuhnya diteruskan ke konsumen hilir. Untuk informasi lebih lanjut, lihat Mekanisme generasi data inkremental. |
Mekanisme penggabungan data (Merge Engine) | Saat tabel primary key Paimon menerima beberapa catatan dengan primary key yang sama, tabel hasil akan menggabungkannya menjadi satu catatan untuk menjaga keunikan. Paimon mendukung berbagai perilaku penggabungan, seperti deduplikasi, pembaruan parsial, dan pra-agregasi. Untuk informasi lebih lanjut, lihat Mekanisme penggabungan data. |
Skenario
Topik ini menggunakan platform E-dagang sebagai contoh untuk menunjukkan cara membangun danau data terpadu aliran guna memproses dan membersihkan data serta mendukung kueri aplikasi. Danau data terpadu aliran ini menerapkan pelapisan dan penggunaan ulang data, serta mendukung berbagai skenario bisnis seperti kueri laporan untuk dasbor transaksi, analitik data perilaku, penandaan persona pengguna, dan rekomendasi personalisasi.

Membangun lapisan ODS: Ingesti data database bisnis secara real-time ke gudang data.
Database MySQL memiliki tiga tabel bisnis: tabel orders, tabel orders_pay, dan tabel kamus product_catalog. Flink menulis data dari tabel-tabel ini ke OSS secara real-time dalam format Paimon sebagai lapisan ODS.Membangun lapisan DWD: Membuat tabel lebar berbasis topik.
Tabel orders, tabel product_catalog, dan tabel orders_pay dilebarkan menggunakan mekanisme penggabungan pembaruan parsial Paimon, menghasilkan tabel lebar lapisan DWD beserta changelog-nya dengan latensi tingkat menit.Membangun lapisan DWS: Menghitung metrik.
Flink mengonsumsi changelog dari tabel lebar secara real-time dan menggunakan mekanisme penggabungan pra-agregasi Paimon untuk menghasilkan lapisan DWM `dwm_users_shops` (tabel antara agregat pengguna-pedagang). Akhirnya, Flink menghasilkan lapisan DWS `dws_users` (tabel metrik agregat pengguna) dan `dws_shops` (tabel metrik agregat pedagang).
Prasyarat
Anda telah mengaktifkan Data Lake Formation. Kami merekomendasikan penggunaan DLF 2.5 sebagai layanan penyimpanan. Untuk informasi lebih lanjut, lihat Memulai dengan DLF.
Anda telah mengaktifkan Flink yang sepenuhnya dikelola. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Anda telah mengaktifkan StarRocks di EMR. Untuk informasi lebih lanjut, lihat Cepat menggunakan instans terpisah komputasi-penyimpanan.
Instans StarRocks, DLF, dan ruang kerja Flink harus berada di Wilayah yang sama.
Batasan
Hanya Ververica Runtime (VVR) 11.1.0 dan versi yang lebih baru yang mendukung solusi danau data terpadu aliran ini.
Membangun danau data terpadu aliran
Siapkan sumber data CDC MySQL
Topik ini menggunakan instans ApsaraDB RDS for MySQL sebagai contoh. Anda perlu membuat database bernama order_dw dan mengisi tiga tabel bisnis dengan data.
Buat instans ApsaraDB RDS for MySQL.
PentingInstans ApsaraDB RDS for MySQL harus berada di VPC yang sama dengan ruang kerja Flink. Jika tidak berada di VPC yang sama, lihat Bagaimana cara mengakses layanan lain lintas VPC?
Buat database dan akun (Usang, dialihkan ke "Langkah 1").
Buat database bernama order_dw. Buat akun istimewa atau akun standar dengan izin baca dan tulis pada database order_dw.
Buat tiga tabel dan masukkan data ke dalamnya.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee bigint not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Siapkan data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Kelola metadata
Buat Katalog Paimon
Login ke Konsol Realtime Compute for Apache Flink.
Di panel navigasi sebelah kiri, buka halaman Metadata Management dan klik Create Catalog.
Di tab Built-in Catalog, klik Apache Paimon, lalu klik Next.
Masukkan parameter berikut, pilih DLF sebagai jenis penyimpanan, lalu klik OK.
Item konfigurasi
Deskripsi
Wajib
Keterangan
metastore
Jenis metastore.
Ya
Dalam contoh ini, pilih dlf.
catalog name
Nama katalog data DLF.
PentingJika Anda menggunakan pengguna atau role Resource Access Management (RAM), pastikan Anda memiliki izin baca dan tulis pada data DLF. Untuk informasi lebih lanjut, lihat Manajemen otorisasi data.
Ya
Kami merekomendasikan penggunaan DLF 2.5. Anda tidak perlu memasukkan AccessKey atau informasi lainnya. Anda dapat langsung memilih katalog data DLF yang sudah ada. Untuk membuat katalog data, lihat Katalog data.
Setelah Anda membuat katalog data bernama paimoncatalog, pilih dari daftar.
Buat database order_dw di katalog data. Database ini digunakan untuk menyinkronkan data dari semua tabel di database order_dw MySQL.
Di panel navigasi sebelah kiri, pilih . Klik New untuk membuat kueri sementara.
-- Gunakan sumber data paimoncatalog. USE CATALOG paimoncatalog; -- Buat database order_dw. CREATE DATABASE order_dw;Pesan
Pernyataan berikut telah berhasil dieksekusi!menunjukkan bahwa database telah dibuat.
Untuk informasi lebih lanjut tentang cara menggunakan Katalog Paimon, lihat Kelola Katalog Paimon.
Buat Katalog MySQL
Di halaman Metadata Management, klik Create Catalog.
Di tab Built-in Catalog, klik MySQL, lalu klik Next.
Masukkan parameter berikut dan klik OK untuk membuat katalog MySQL bernama mysqlcatalog.
Item konfigurasi
Deskripsi
Wajib
Keterangan
catalog name
Nama katalog.
Ya
Masukkan nama kustom dalam bahasa Inggris. Topik ini menggunakan mysqlcatalog sebagai contoh.
hostname
Alamat IP atau hostname database MySQL.
Ya
Untuk informasi lebih lanjut, lihat Lihat dan kelola titik akhir serta port instans. Karena instans ApsaraDB RDS for MySQL dan ruang kerja Flink yang sepenuhnya dikelola berada di VPC yang sama, masukkan alamat jaringan pribadi.
port
Nomor port layanan database MySQL. Nilai default adalah 3306.
Tidak
Untuk informasi lebih lanjut, lihat Lihat dan kelola titik akhir serta port instans.
default-database
Nama database MySQL default.
Ya
Masukkan nama database yang akan disinkronkan, yaitu order_dw dalam topik ini.
username
Username untuk layanan database MySQL.
Ya
Ini adalah akun yang dibuat di bagian Siapkan sumber data CDC MySQL.
password
Password untuk layanan database MySQL.
Ya
Ini adalah password untuk akun yang dibuat di bagian Siapkan sumber data CDC MySQL.
Bangun lapisan ODS: Ingesti data database bisnis secara real-time
Gunakan Flink Change Data Capture (CDC) dan pekerjaan ingesti data YAML untuk menyinkronkan data dari MySQL ke Paimon dalam satu langkah guna membangun lapisan ODS.
Buat dan mulai pekerjaan ingesti data YAML.
Di Konsol Realtime Compute for Apache Flink, di halaman , buat pekerjaan draft YAML kosong bernama ods.
Salin kode berikut ke editor. Pastikan untuk memodifikasi parameter seperti username dan password.
source: type: mysql name: MySQL Source hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.username} password: ${secret_values.password} tables: order_dw.\.* # Gunakan ekspresi reguler untuk membaca semua tabel di database order_dw. # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental. scan.binlog.newly-added-table.enabled: true # (Opsional) Sinkronkan komentar tabel dan field. include-comments.enabled: true # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com catalog.properties.warehouse: paimoncatalog catalog.properties.token.provider: dlf pipeline: name: MySQL to Paimon PipelineItem konfigurasi
Deskripsi
Wajib
Contoh
catalog.properties.metastoreJenis metastore. Nilainya tetap rest.
Ya
rest
catalog.properties.token.providerPenyedia token. Nilainya tetap dlf.
Ya
dlf
catalog.properties.uriURI untuk mengakses DLF Rest Catalog Server. Formatnya
http://[region-id]-vpc.dlf.aliyuncs.com. Untuk informasi lebih lanjut tentang ID Wilayah, lihat Titik akhir.Ya
http://cn-beijing-vpc.dlf.aliyuncs.com
catalog.properties.warehouseNama Katalog DLF.
Ya
paimoncatalog
Untuk informasi lebih lanjut tentang cara mengoptimalkan performa penulisan Paimon, lihat Optimasi performa Paimon.
Di pojok kanan atas, klik Deploy.
Di halaman , temukan pekerjaan ods yang baru saja Anda sebarkan. Di kolom Actions, klik Start. Pilih Stateless Start untuk memulai pekerjaan. Untuk informasi lebih lanjut tentang konfigurasi pemulaan pekerjaan, lihat Mulai pekerjaan.
Lihat data dari tiga tabel yang disinkronkan dari MySQL ke Paimon.
Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;
Bangun lapisan DWD: Tabel lebar berbasis topik
Buat tabel lebar DWD Paimon dwd_orders
Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
CREATE TABLE paimoncatalog.order_dw.dwd_orders ( order_id BIGINT, order_user_id STRING, order_shop_id BIGINT, order_product_id BIGINT, order_product_catalog_name STRING, order_fee BIGINT, order_create_time TIMESTAMP, order_update_time TIMESTAMP, order_state INT, pay_id BIGINT, pay_platform INT COMMENT 'platform 0: phone, 1: pc', pay_create_time TIMESTAMP, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', -- Gunakan mesin penggabungan partial-update untuk menghasilkan tabel lebar. 'changelog-producer' = 'lookup' -- Gunakan produsen changelog lookup untuk menghasilkan changelog dengan latensi rendah. );Pesan
Kueri telah dieksekusimenunjukkan bahwa tabel telah dibuat.Konsumsi changelog dari tabel lapisan ODS orders dan orders_pay secara real-time
Di Konsol Realtime Compute for Apache Flink, di halaman , buat pekerjaan aliran SQL bernama dwd, salin kode berikut ke editor SQL, Deploy pekerjaan tersebut, lalu Start secara stateless.
Pekerjaan SQL ini melakukan join tabel dimensi antara tabel orders dan tabel product_catalog. Hasil join tersebut, bersama dengan data dari tabel orders_pay, ditulis ke tabel dwd_orders. Mesin penggabungan partial-update Paimon digunakan untuk melebarkan data untuk catatan dengan order_id yang sama di tabel orders dan orders_pay.
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- Paimon saat ini tidak mendukung beberapa pernyataan INSERT ke tabel yang sama dalam satu pekerjaan. Oleh karena itu, gunakan UNION ALL. INSERT INTO paimoncatalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, dim.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, NULL, NULL, NULL FROM paimoncatalog.order_dw.orders o LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id UNION ALL SELECT order_id, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, pay_id, pay_platform, create_time FROM paimoncatalog.order_dw.orders_pay;Lihat data di tabel lebar dwd_orders
Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;
Bangun lapisan DWS: Perhitungan metrik
Buat tabel agregasi lapisan DWS dws_users dan dws_shops
Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
-- Tabel metrik agregat berdimensi pengguna. CREATE TABLE paimoncatalog.order_dw.dws_users ( user_id STRING, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pada hari ini', PRIMARY KEY (user_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi. 'fields.paid_buy_fee_sum.aggregate-function' = 'sum' -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat. -- Karena tabel dws_users tidak dikonsumsi secara streaming di hilir, Anda tidak perlu menentukan produsen changelog. ); -- Tabel metrik agregat berdimensi pedagang. CREATE TABLE paimoncatalog.order_dw.dws_shops ( shop_id BIGINT, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pada hari ini', uv BIGINT COMMENT 'Total jumlah pengguna unik yang melakukan pembelian pada hari ini', pv BIGINT COMMENT 'Total jumlah pembelian oleh pengguna pada hari ini', PRIMARY KEY (shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi. 'fields.paid_buy_fee_sum.aggregate-function' = 'sum', -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat. 'fields.uv.aggregate-function' = 'sum', -- Jumlahkan data di field uv untuk menghasilkan hasil agregat. 'fields.pv.aggregate-function' = 'sum' -- Jumlahkan data di field pv untuk menghasilkan hasil agregat. -- Karena tabel dws_shops tidak dikonsumsi secara streaming di hilir, Anda tidak perlu menentukan produsen changelog. ); -- Untuk menghitung tabel agregasi dari perspektif pengguna dan pedagang secara bersamaan, buat tabel antara dengan primary key komposit pengguna dan pedagang. CREATE TABLE paimoncatalog.order_dw.dwm_users_shops ( user_id STRING, shop_id BIGINT, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pengguna di pedagang pada hari ini', pv BIGINT COMMENT 'Jumlah kali pengguna melakukan pembelian di pedagang pada hari ini', PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi. 'fields.paid_buy_fee_sum.aggregate-function' = 'sum', -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat. 'fields.pv.aggregate-function' = 'sum', -- Jumlahkan data di field pv untuk menghasilkan hasil agregat. 'changelog-producer' = 'lookup', -- Gunakan produsen changelog lookup untuk menghasilkan changelog dengan latensi rendah. -- Tabel antara di lapisan DWM umumnya tidak dikueri langsung oleh aplikasi lapisan atas, sehingga Anda dapat mengoptimalkan performa penulisannya. 'file.format' = 'avro', -- Format penyimpanan baris Avro memberikan performa penulisan yang lebih efisien. 'metadata.stats-mode' = 'none' -- Mengabaikan statistik meningkatkan biaya kueri OLAP tetapi meningkatkan performa penulisan. Hal ini tidak memengaruhi pemrosesan aliran kontinu. );Tanggapan
Kueri telah dieksekusimenunjukkan bahwa pembuatan berhasil.Ubah data di tabel dwd_orders di lapisan DWD
Di tab di Konsol Realtime Compute for Apache Flink, buat pekerjaan aliran SQL baru bernama dwm. Salin kode berikut ke editor SQL, deploy pekerjaan tersebut, lalu start secara stateless.
Pekerjaan SQL ini menulis data dari tabel dwd_orders ke tabel dwm_users_shops. Pekerjaan ini menggunakan mesin penggabungan pra-agregasi Paimon untuk secara otomatis menjumlahkan order_fee guna menghitung total jumlah konsumsi per pengguna per pedagang. Pekerjaan ini juga menjumlahkan nilai 1 untuk menghitung jumlah pembelian per pengguna per pedagang.
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; INSERT INTO paimoncatalog.order_dw.dwm_users_shops SELECT order_user_id, order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, order_fee, 1 -- Satu catatan input mewakili satu pembelian. FROM paimoncatalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;Konsumsi changelog dari tabel lapisan DWM dwm_users_shops secara real-time
Di halaman di Konsol Realtime Compute for Apache Flink, buat pekerjaan aliran SQL baru bernama dws. Salin kode berikut ke editor SQL, deploy pekerjaan tersebut, lalu start secara stateless.
Pekerjaan SQL ini menulis data dari tabel dwm_users_shops ke tabel dws_users dan dws_shops. Pekerjaan ini menggunakan mesin penggabungan pra-agregasi Paimon untuk menghitung total jumlah konsumsi untuk setiap pengguna (paid_buy_fee_sum) di tabel dws_users. Di tabel dws_shops, pekerjaan ini menghitung total pendapatan untuk setiap pedagang (paid_buy_fee_sum), jumlah pengguna yang melakukan pembelian (dengan menjumlahkan 1), dan total jumlah pembelian (pv).
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- Berbeda dengan pekerjaan DWD, setiap pernyataan INSERT di sini menulis ke tabel Paimon yang berbeda, sehingga dapat berada dalam satu pekerjaan yang sama. BEGIN STATEMENT SET; INSERT INTO paimoncatalog.order_dw.dws_users SELECT user_id, ds, paid_buy_fee_sum FROM paimoncatalog.order_dw.dwm_users_shops; -- Primary key-nya adalah pedagang. Volume data untuk beberapa pedagang populer mungkin jauh lebih tinggi daripada yang lain. -- Oleh karena itu, gunakan local merge untuk pra-agregasi data di memori sebelum menulis ke Paimon. Hal ini membantu mengurangi kesenjangan data. INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */ SELECT shop_id, ds, paid_buy_fee_sum, 1, -- Satu catatan input mewakili seluruh konsumsi oleh satu pengguna di pedagang tersebut. pv FROM paimoncatalog.order_dw.dwm_users_shops; END;Lihat data di tabel dws_users dan dws_shops
Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
--Lihat data di tabel dws_users. SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
--Lihat data di tabel dws_shops. SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
Tangkap perubahan di database bisnis
Setelah danau data terpadu aliran dibangun, Anda dapat menguji kemampuannya dalam menangkap perubahan dari database bisnis.
Masukkan data berikut ke database order_dw di MySQL.
INSERT INTO orders VALUES (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1), (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1), (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 18:40:56'), (2009, 100009, 1, '2023-02-15 19:40:56'), (2010, 100010, 0, '2023-02-15 20:40:56');Lihat data di tabel dws_users dan dws_shops. Di tab Query Script halaman di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.
tabel dws_users
SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
tabel dws_shops
SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
Gunakan danau data terpadu aliran
Bagian sebelumnya menunjukkan cara membuat Katalog Paimon dan menulis ke tabel Paimon di Flink. Bagian ini menunjukkan beberapa skenario sederhana untuk analitik data menggunakan StarRocks setelah Anda membangun danau data terpadu aliran.
Pertama, login ke instans StarRocks dan buat katalog untuk oss-paimon. Untuk informasi lebih lanjut, lihat Katalog Paimon.
CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
'type' = 'paimon',
'paimon.catalog.type' = 'filesystem',
'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);Properti | Wajib | Keterangan |
type | Ya | Jenis sumber data. Nilainya adalah paimon. |
paimon.catalog.type | Ya | Jenis metastore yang digunakan oleh Paimon. Contoh ini menggunakan filesystem sebagai jenis metastore. |
aliyun.oss.endpoint | Ya | Jika Anda menggunakan OSS atau OSS-HDFS sebagai gudang, Anda harus menentukan titik akhir yang sesuai. |
paimon.catalog.warehouse | Ya | Formatnya adalah oss://<bucket>/<object>. Dalam format ini:
Anda dapat melihat nama bucket dan object Anda di Konsol OSS. |
Kueri peringkat
Analisis tabel agregasi lapisan DWS. Contoh berikut menunjukkan cara menggunakan StarRocks untuk mengkueri tiga pedagang teratas dengan jumlah transaksi tertinggi pada 15 Februari 2023.
SELECT ROW_NUMBER() OVER (ORDER BY paid_buy_fee_sum DESC) AS rn, shop_id, paid_buy_fee_sum
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
Kueri detail
Contoh ini menunjukkan cara menganalisis tabel lebar lapisan DWD menggunakan StarRocks untuk mengkueri detail pesanan pelanggan tertentu yang membayar menggunakan platform pembayaran tertentu pada Februari 2023.
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;
Laporan data
Contoh berikut menunjukkan cara menganalisis tabel lebar lapisan DWD. Anda dapat menggunakan StarRocks untuk mengkueri jumlah total pesanan dan jumlah total pesanan untuk setiap kategori produk pada Februari 2023.
SELECT
order_create_time AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
Referensi
Untuk membangun danau data Paimon offline menggunakan kemampuan pemrosesan batch Flink, lihat Memulai cepat pemrosesan batch Flink.