全部产品
Search
文档中心

Realtime Compute for Apache Flink:Membangun danau data terpadu streaming dengan Paimon dan StarRocks

更新时间:Jan 27, 2026

Topik ini menjelaskan cara membangun danau data terpadu aliran (streaming data lakehouse) menggunakan Realtime Compute for Apache Flink, Paimon, dan StarRocks.

Informasi latar belakang

Seiring meningkatnya digitalisasi bisnis, permintaan terhadap data yang tepat waktu juga bertambah. Gudang data offline tradisional mengikuti metodologi yang jelas, yaitu menggunakan pekerjaan offline terjadwal untuk menggabungkan perubahan dari periode sebelumnya ke dalam hierarki gudang data yang mencakup lapisan Operational Data Store (ODS), Data Warehouse Detail (DWD), Data Warehouse Summary (DWS), dan Application Data Store (ADS). Namun, pendekatan ini memiliki dua masalah utama: latensi tinggi dan biaya tinggi. Pekerjaan offline biasanya hanya dijalankan sekali per jam atau bahkan sekali per hari, sehingga konsumen data hanya dapat melihat data dari jam atau hari sebelumnya. Selain itu, pembaruan data sering kali menimpa seluruh partisi, yang tidak efisien karena memerlukan pembacaan ulang seluruh data asli dalam partisi untuk digabungkan dengan perubahan baru.

Anda dapat membangun danau data terpadu aliran menggunakan Realtime Compute for Apache Flink dan Paimon untuk mengatasi keterbatasan gudang data offline tradisional. Kemampuan komputasi real-time Flink memungkinkan data mengalir antar lapisan gudang data secara real time, sedangkan kemampuan pembaruan efisien Paimon memungkinkan perubahan data dikirimkan ke konsumen hilir dengan latensi tingkat menit. Dengan demikian, danau data terpadu aliran unggul dalam hal latensi maupun biaya.

Untuk informasi lebih lanjut tentang fitur-fitur Paimon, lihat Fitur dan situs resmi Apache Paimon.

Arsitektur dan manfaat

Arsitektur

Realtime Compute for Apache Flink adalah mesin pemrosesan aliran andal yang mendukung pemrosesan efisien volume besar data real-time. Paimon adalah format penyimpanan danau terpadu untuk pemrosesan aliran dan batch yang mendukung pembaruan throughput tinggi dan kueri latensi rendah. Paimon terintegrasi erat dengan Flink untuk menyediakan solusi danau data terpadu aliran yang terpadu. Gambar berikut menunjukkan arsitektur danau data terpadu aliran yang dibangun dengan Flink dan Paimon:

  1. Flink menulis data dari sumber data ke Paimon untuk membentuk lapisan ODS.

  2. Flink berlangganan changelog lapisan ODS, memproses data, lalu menuliskannya kembali ke Paimon untuk membentuk lapisan DWD.

  3. Flink berlangganan changelog lapisan DWD, memproses data, lalu menuliskannya kembali ke Paimon untuk membentuk lapisan DWS.

  4. Akhirnya, StarRocks di E-MapReduce membaca tabel eksternal Paimon untuk kueri aplikasi.

image

Manfaat

Solusi ini memberikan manfaat sebagai berikut:

  • Setiap lapisan Paimon dapat mengirimkan perubahan ke konsumen hilir dengan latensi tingkat menit, sehingga mengurangi latensi gudang data offline tradisional dari hitungan jam atau bahkan hari menjadi hitungan menit.

  • Setiap lapisan Paimon dapat langsung menerima data perubahan tanpa menimpa partisi, yang secara signifikan mengurangi biaya pembaruan dan koreksi data pada gudang data offline tradisional serta mengatasi tantangan dalam mengkueri, memperbarui, dan mengoreksi data pada lapisan antara.

  • Modelnya terpadu dan arsitekturnya disederhanakan. Logika pipeline ekstrak, transformasi, dan muat (ETL) diimplementasikan menggunakan Flink SQL, sedangkan data pada lapisan ODS, DWD, dan DWS disimpan secara seragam dalam format Paimon. Hal ini mengurangi kompleksitas arsitektur dan meningkatkan efisiensi pemrosesan data.

Solusi ini bergantung pada tiga kemampuan inti Paimon. Tabel berikut memberikan rincian lebih lanjut.

Kemampuan inti Paimon

Rincian

Pembaruan tabel primary key

Paimon menggunakan struktur data Log-Structured Merge-tree (LSM Tree) di lapisan dasar untuk mencapai pembaruan data yang efisien.

Untuk informasi lebih lanjut tentang tabel primary key Paimon dan struktur data dasarnya, lihat Primary Key Table dan File Layouts.

Mekanisme generasi data inkremental (Changelog Producer)

Paimon dapat menghasilkan changelog lengkap untuk setiap aliran data input. Setiap catatan update_after memiliki catatan update_before yang sesuai. Hal ini memastikan bahwa perubahan sepenuhnya diteruskan ke konsumen hilir. Untuk informasi lebih lanjut, lihat Mekanisme generasi data inkremental.

Mekanisme penggabungan data (Merge Engine)

Saat tabel primary key Paimon menerima beberapa catatan dengan primary key yang sama, tabel hasil akan menggabungkannya menjadi satu catatan untuk menjaga keunikan. Paimon mendukung berbagai perilaku penggabungan, seperti deduplikasi, pembaruan parsial, dan pra-agregasi. Untuk informasi lebih lanjut, lihat Mekanisme penggabungan data.

Skenario

Topik ini menggunakan platform E-dagang sebagai contoh untuk menunjukkan cara membangun danau data terpadu aliran guna memproses dan membersihkan data serta mendukung kueri aplikasi. Danau data terpadu aliran ini menerapkan pelapisan dan penggunaan ulang data, serta mendukung berbagai skenario bisnis seperti kueri laporan untuk dasbor transaksi, analitik data perilaku, penandaan persona pengguna, dan rekomendasi personalisasi.

image

  1. Membangun lapisan ODS: Ingesti data database bisnis secara real-time ke gudang data.
    Database MySQL memiliki tiga tabel bisnis: tabel orders, tabel orders_pay, dan tabel kamus product_catalog. Flink menulis data dari tabel-tabel ini ke OSS secara real-time dalam format Paimon sebagai lapisan ODS.

  2. Membangun lapisan DWD: Membuat tabel lebar berbasis topik.
    Tabel orders, tabel product_catalog, dan tabel orders_pay dilebarkan menggunakan mekanisme penggabungan pembaruan parsial Paimon, menghasilkan tabel lebar lapisan DWD beserta changelog-nya dengan latensi tingkat menit.

  3. Membangun lapisan DWS: Menghitung metrik.
    Flink mengonsumsi changelog dari tabel lebar secara real-time dan menggunakan mekanisme penggabungan pra-agregasi Paimon untuk menghasilkan lapisan DWM `dwm_users_shops` (tabel antara agregat pengguna-pedagang). Akhirnya, Flink menghasilkan lapisan DWS `dws_users` (tabel metrik agregat pengguna) dan `dws_shops` (tabel metrik agregat pedagang).

Prasyarat

Catatan

Instans StarRocks, DLF, dan ruang kerja Flink harus berada di Wilayah yang sama.

Batasan

Hanya Ververica Runtime (VVR) 11.1.0 dan versi yang lebih baru yang mendukung solusi danau data terpadu aliran ini.

Membangun danau data terpadu aliran

Siapkan sumber data CDC MySQL

Topik ini menggunakan instans ApsaraDB RDS for MySQL sebagai contoh. Anda perlu membuat database bernama order_dw dan mengisi tiga tabel bisnis dengan data.

  1. Buat instans ApsaraDB RDS for MySQL.

    Penting

    Instans ApsaraDB RDS for MySQL harus berada di VPC yang sama dengan ruang kerja Flink. Jika tidak berada di VPC yang sama, lihat Bagaimana cara mengakses layanan lain lintas VPC?

  2. Buat database dan akun (Usang, dialihkan ke "Langkah 1").

    Buat database bernama order_dw. Buat akun istimewa atau akun standar dengan izin baca dan tulis pada database order_dw.

    Buat tiga tabel dan masukkan data ke dalamnya.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Siapkan data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Kelola metadata

Buat Katalog Paimon

  1. Login ke Konsol Realtime Compute for Apache Flink.

  2. Di panel navigasi sebelah kiri, buka halaman Metadata Management dan klik Create Catalog.

  3. Di tab Built-in Catalog, klik Apache Paimon, lalu klik Next.

  4. Masukkan parameter berikut, pilih DLF sebagai jenis penyimpanan, lalu klik OK.

    Item konfigurasi

    Deskripsi

    Wajib

    Keterangan

    metastore

    Jenis metastore.

    Ya

    Dalam contoh ini, pilih dlf.

    catalog name

    Nama katalog data DLF.

    Penting

    Jika Anda menggunakan pengguna atau role Resource Access Management (RAM), pastikan Anda memiliki izin baca dan tulis pada data DLF. Untuk informasi lebih lanjut, lihat Manajemen otorisasi data.

    Ya

    Kami merekomendasikan penggunaan DLF 2.5. Anda tidak perlu memasukkan AccessKey atau informasi lainnya. Anda dapat langsung memilih katalog data DLF yang sudah ada. Untuk membuat katalog data, lihat Katalog data.

    Setelah Anda membuat katalog data bernama paimoncatalog, pilih dari daftar.

  5. Buat database order_dw di katalog data. Database ini digunakan untuk menyinkronkan data dari semua tabel di database order_dw MySQL.

    Di panel navigasi sebelah kiri, pilih Data Exploration > Query Script. Klik New untuk membuat kueri sementara.

    -- Gunakan sumber data paimoncatalog.
    USE CATALOG paimoncatalog;
    -- Buat database order_dw.
    CREATE DATABASE order_dw;

    Pesan Pernyataan berikut telah berhasil dieksekusi! menunjukkan bahwa database telah dibuat.

Untuk informasi lebih lanjut tentang cara menggunakan Katalog Paimon, lihat Kelola Katalog Paimon.

Buat Katalog MySQL

  1. Di halaman Metadata Management, klik Create Catalog.

  2. Di tab Built-in Catalog, klik MySQL, lalu klik Next.

  3. Masukkan parameter berikut dan klik OK untuk membuat katalog MySQL bernama mysqlcatalog.

    Item konfigurasi

    Deskripsi

    Wajib

    Keterangan

    catalog name

    Nama katalog.

    Ya

    Masukkan nama kustom dalam bahasa Inggris. Topik ini menggunakan mysqlcatalog sebagai contoh.

    hostname

    Alamat IP atau hostname database MySQL.

    Ya

    Untuk informasi lebih lanjut, lihat Lihat dan kelola titik akhir serta port instans. Karena instans ApsaraDB RDS for MySQL dan ruang kerja Flink yang sepenuhnya dikelola berada di VPC yang sama, masukkan alamat jaringan pribadi.

    port

    Nomor port layanan database MySQL. Nilai default adalah 3306.

    Tidak

    Untuk informasi lebih lanjut, lihat Lihat dan kelola titik akhir serta port instans.

    default-database

    Nama database MySQL default.

    Ya

    Masukkan nama database yang akan disinkronkan, yaitu order_dw dalam topik ini.

    username

    Username untuk layanan database MySQL.

    Ya

    Ini adalah akun yang dibuat di bagian Siapkan sumber data CDC MySQL.

    password

    Password untuk layanan database MySQL.

    Ya

    Ini adalah password untuk akun yang dibuat di bagian Siapkan sumber data CDC MySQL.

Bangun lapisan ODS: Ingesti data database bisnis secara real-time

Gunakan Flink Change Data Capture (CDC) dan pekerjaan ingesti data YAML untuk menyinkronkan data dari MySQL ke Paimon dalam satu langkah guna membangun lapisan ODS.

  1. Buat dan mulai pekerjaan ingesti data YAML.

    1. Di Konsol Realtime Compute for Apache Flink, di halaman Data Studio > Data Ingestion, buat pekerjaan draft YAML kosong bernama ods.

    2. Salin kode berikut ke editor. Pastikan untuk memodifikasi parameter seperti username dan password.

      source:
        type: mysql
        name: MySQL Source
        hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com
        port: 3306
        username: ${secret_values.username}
        password: ${secret_values.password}
        tables: order_dw.\.*  # Gunakan ekspresi reguler untuk membaca semua tabel di database order_dw.
        # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
        scan.binlog.newly-added-table.enabled: true
        # (Opsional) Sinkronkan komentar tabel dan field.
        include-comments.enabled: true
        # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory TaskManager.
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
        scan.only.deserialize.captured.tables.changelog.enabled: true
      
      sink:
        type: paimon
        name: Paimon Sink
        catalog.properties.metastore: rest
        catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com
        catalog.properties.warehouse: paimoncatalog
        catalog.properties.token.provider: dlf
        
      pipeline:
        name: MySQL to Paimon Pipeline

      Item konfigurasi

      Deskripsi

      Wajib

      Contoh

      catalog.properties.metastore

      Jenis metastore. Nilainya tetap rest.

      Ya

      rest

      catalog.properties.token.provider

      Penyedia token. Nilainya tetap dlf.

      Ya

      dlf

      catalog.properties.uri

      URI untuk mengakses DLF Rest Catalog Server. Formatnya http://[region-id]-vpc.dlf.aliyuncs.com. Untuk informasi lebih lanjut tentang ID Wilayah, lihat Titik akhir.

      Ya

      http://cn-beijing-vpc.dlf.aliyuncs.com

      catalog.properties.warehouse

      Nama Katalog DLF.

      Ya

      paimoncatalog

      Untuk informasi lebih lanjut tentang cara mengoptimalkan performa penulisan Paimon, lihat Optimasi performa Paimon.

    3. Di pojok kanan atas, klik Deploy.

    4. Di halaman Operation Center > Deployments, temukan pekerjaan ods yang baru saja Anda sebarkan. Di kolom Actions, klik Start. Pilih Stateless Start untuk memulai pekerjaan. Untuk informasi lebih lanjut tentang konfigurasi pemulaan pekerjaan, lihat Mulai pekerjaan.

  2. Lihat data dari tiga tabel yang disinkronkan dari MySQL ke Paimon.

    Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;

    Screenshot 2024-09-02 14

Bangun lapisan DWD: Tabel lebar berbasis topik

  1. Buat tabel lebar DWD Paimon dwd_orders

    Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    CREATE TABLE paimoncatalog.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'platform 0: phone, 1: pc',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update', -- Gunakan mesin penggabungan partial-update untuk menghasilkan tabel lebar.
        'changelog-producer' = 'lookup' -- Gunakan produsen changelog lookup untuk menghasilkan changelog dengan latensi rendah.
    );

    Pesan Kueri telah dieksekusi menunjukkan bahwa tabel telah dibuat.

  2. Konsumsi changelog dari tabel lapisan ODS orders dan orders_pay secara real-time

    Di Konsol Realtime Compute for Apache Flink, di halaman Data Development > ETL, buat pekerjaan aliran SQL bernama dwd, salin kode berikut ke editor SQL, Deploy pekerjaan tersebut, lalu Start secara stateless.

    Pekerjaan SQL ini melakukan join tabel dimensi antara tabel orders dan tabel product_catalog. Hasil join tersebut, bersama dengan data dari tabel orders_pay, ditulis ke tabel dwd_orders. Mesin penggabungan partial-update Paimon digunakan untuk melebarkan data untuk catatan dengan order_id yang sama di tabel orders dan orders_pay.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Paimon saat ini tidak mendukung beberapa pernyataan INSERT ke tabel yang sama dalam satu pekerjaan. Oleh karena itu, gunakan UNION ALL.
    INSERT INTO paimoncatalog.order_dw.dwd_orders 
    SELECT 
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,
        NULL,
        NULL
    FROM
        paimoncatalog.order_dw.orders o 
        LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        pay_id,
        pay_platform,
        create_time
    FROM
        paimoncatalog.order_dw.orders_pay;
  3. Lihat data di tabel lebar dwd_orders

    Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;

    Screenshot 2024-09-02 14

Bangun lapisan DWS: Perhitungan metrik

  1. Buat tabel agregasi lapisan DWS dws_users dan dws_shops

    Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    -- Tabel metrik agregat berdimensi pengguna.
    CREATE TABLE paimoncatalog.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pada hari ini',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi.
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum' -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat.
        -- Karena tabel dws_users tidak dikonsumsi secara streaming di hilir, Anda tidak perlu menentukan produsen changelog.
    );
    
    -- Tabel metrik agregat berdimensi pedagang.
    CREATE TABLE paimoncatalog.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pada hari ini',
        uv BIGINT COMMENT 'Total jumlah pengguna unik yang melakukan pembelian pada hari ini',
        pv BIGINT COMMENT 'Total jumlah pembelian oleh pengguna pada hari ini',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi.
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum', -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat.
        'fields.uv.aggregate-function' = 'sum', -- Jumlahkan data di field uv untuk menghasilkan hasil agregat.
        'fields.pv.aggregate-function' = 'sum' -- Jumlahkan data di field pv untuk menghasilkan hasil agregat.
        -- Karena tabel dws_shops tidak dikonsumsi secara streaming di hilir, Anda tidak perlu menentukan produsen changelog.
    );
    
    -- Untuk menghitung tabel agregasi dari perspektif pengguna dan pedagang secara bersamaan, buat tabel antara dengan primary key komposit pengguna dan pedagang.
    CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total jumlah yang dibayarkan pengguna di pedagang pada hari ini',
        pv BIGINT COMMENT 'Jumlah kali pengguna melakukan pembelian di pedagang pada hari ini',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Gunakan mesin penggabungan agregasi untuk menghasilkan tabel agregasi.
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum', -- Jumlahkan data di field paid_buy_fee_sum untuk menghasilkan hasil agregat.
        'fields.pv.aggregate-function' = 'sum', -- Jumlahkan data di field pv untuk menghasilkan hasil agregat.
        'changelog-producer' = 'lookup', -- Gunakan produsen changelog lookup untuk menghasilkan changelog dengan latensi rendah.
        -- Tabel antara di lapisan DWM umumnya tidak dikueri langsung oleh aplikasi lapisan atas, sehingga Anda dapat mengoptimalkan performa penulisannya.
        'file.format' = 'avro', -- Format penyimpanan baris Avro memberikan performa penulisan yang lebih efisien.
        'metadata.stats-mode' = 'none' -- Mengabaikan statistik meningkatkan biaya kueri OLAP tetapi meningkatkan performa penulisan. Hal ini tidak memengaruhi pemrosesan aliran kontinu.
    );

    Tanggapan Kueri telah dieksekusi menunjukkan bahwa pembuatan berhasil.

  2. Ubah data di tabel dwd_orders di lapisan DWD

    Di tab Data Development > ETL di Konsol Realtime Compute for Apache Flink, buat pekerjaan aliran SQL baru bernama dwm. Salin kode berikut ke editor SQL, deploy pekerjaan tersebut, lalu start secara stateless.

    Pekerjaan SQL ini menulis data dari tabel dwd_orders ke tabel dwm_users_shops. Pekerjaan ini menggunakan mesin penggabungan pra-agregasi Paimon untuk secara otomatis menjumlahkan order_fee guna menghitung total jumlah konsumsi per pengguna per pedagang. Pekerjaan ini juga menjumlahkan nilai 1 untuk menghitung jumlah pembelian per pengguna per pedagang.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO paimoncatalog.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        order_fee,
        1 -- Satu catatan input mewakili satu pembelian.
    FROM paimoncatalog.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. Konsumsi changelog dari tabel lapisan DWM dwm_users_shops secara real-time

    Di halaman Data Development > ETL di Konsol Realtime Compute for Apache Flink, buat pekerjaan aliran SQL baru bernama dws. Salin kode berikut ke editor SQL, deploy pekerjaan tersebut, lalu start secara stateless.

    Pekerjaan SQL ini menulis data dari tabel dwm_users_shops ke tabel dws_users dan dws_shops. Pekerjaan ini menggunakan mesin penggabungan pra-agregasi Paimon untuk menghitung total jumlah konsumsi untuk setiap pengguna (paid_buy_fee_sum) di tabel dws_users. Di tabel dws_shops, pekerjaan ini menghitung total pendapatan untuk setiap pedagang (paid_buy_fee_sum), jumlah pengguna yang melakukan pembelian (dengan menjumlahkan 1), dan total jumlah pembelian (pv).

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Berbeda dengan pekerjaan DWD, setiap pernyataan INSERT di sini menulis ke tabel Paimon yang berbeda, sehingga dapat berada dalam satu pekerjaan yang sama.
    BEGIN STATEMENT SET;
    
    INSERT INTO paimoncatalog.order_dw.dws_users
    SELECT 
        user_id,
        ds,
        paid_buy_fee_sum
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    -- Primary key-nya adalah pedagang. Volume data untuk beberapa pedagang populer mungkin jauh lebih tinggi daripada yang lain.
    -- Oleh karena itu, gunakan local merge untuk pra-agregasi data di memori sebelum menulis ke Paimon. Hal ini membantu mengurangi kesenjangan data.
    INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        paid_buy_fee_sum,
        1, -- Satu catatan input mewakili seluruh konsumsi oleh satu pengguna di pedagang tersebut.
        pv
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    END;
  4. Lihat data di tabel dws_users dan dws_shops

    Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    --Lihat data di tabel dws_users.
    SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

    image

    --Lihat data di tabel dws_shops.
    SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

    Screenshot 2024-09-02 14

Tangkap perubahan di database bisnis

Setelah danau data terpadu aliran dibangun, Anda dapat menguji kemampuannya dalam menangkap perubahan dari database bisnis.

  1. Masukkan data berikut ke database order_dw di MySQL.

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. Lihat data di tabel dws_users dan dws_shops. Di tab Query Script halaman Data Development > Data Exploration di Konsol Realtime Compute for Apache Flink, salin kode berikut ke skrip kueri. Pilih potongan kode tersebut dan klik Run di pojok kanan atas.

    • tabel dws_users

      SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

      Screenshot 2024-09-02 15

    • tabel dws_shops

      SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

      Screenshot 2024-09-02 15

Gunakan danau data terpadu aliran

Bagian sebelumnya menunjukkan cara membuat Katalog Paimon dan menulis ke tabel Paimon di Flink. Bagian ini menunjukkan beberapa skenario sederhana untuk analitik data menggunakan StarRocks setelah Anda membangun danau data terpadu aliran.

Pertama, login ke instans StarRocks dan buat katalog untuk oss-paimon. Untuk informasi lebih lanjut, lihat Katalog Paimon.

CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
    'type' = 'paimon',
    'paimon.catalog.type' = 'filesystem',
    'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
    'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);

Properti

Wajib

Keterangan

type

Ya

Jenis sumber data. Nilainya adalah paimon.

paimon.catalog.type

Ya

Jenis metastore yang digunakan oleh Paimon. Contoh ini menggunakan filesystem sebagai jenis metastore.

aliyun.oss.endpoint

Ya

Jika Anda menggunakan OSS atau OSS-HDFS sebagai gudang, Anda harus menentukan titik akhir yang sesuai.

paimon.catalog.warehouse

Ya

Formatnya adalah oss://<bucket>/<object>. Dalam format ini:

  • bucket: nama Bucket OSS yang Anda buat.

  • object: jalur tempat data Anda disimpan.

Anda dapat melihat nama bucket dan object Anda di Konsol OSS.

Kueri peringkat

Analisis tabel agregasi lapisan DWS. Contoh berikut menunjukkan cara menggunakan StarRocks untuk mengkueri tiga pedagang teratas dengan jumlah transaksi tertinggi pada 15 Februari 2023.

SELECT ROW_NUMBER() OVER (ORDER BY paid_buy_fee_sum DESC) AS rn, shop_id, paid_buy_fee_sum 
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

image

Kueri detail

Contoh ini menunjukkan cara menganalisis tabel lebar lapisan DWD menggunakan StarRocks untuk mengkueri detail pesanan pelanggan tertentu yang membayar menggunakan platform pembayaran tertentu pada Februari 2023.

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;

image

Laporan data

Contoh berikut menunjukkan cara menganalisis tabel lebar lapisan DWD. Anda dapat menggunakan StarRocks untuk mengkueri jumlah total pesanan dan jumlah total pesanan untuk setiap kategori produk pada Februari 2023.

SELECT
  order_create_time AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image

Referensi