全部产品
Search
文档中心

Realtime Compute for Apache Flink:Memulai dengan tabel materialized: Membangun data lakehouse terintegrasi aliran-batch

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan tabel materialized Flink untuk membangun data lakehouse terintegrasi aliran-batch. Topik ini juga mencakup cara menyesuaikan kesegaran tabel materialized untuk beralih dari mode eksekusi batch ke mode streaming, yang memungkinkan pembaruan data secara real-time.

Pengenalan tabel materialized

Tabel materialized adalah tipe tabel baru dalam Flink SQL yang bertujuan menyederhanakan pipeline data batch dan streaming untuk memberikan pengalaman pengembangan yang konsisten. Saat membuat tabel materialized, Anda tidak perlu mendeklarasikan bidang dan tipe. Sebagai gantinya, cukup tentukan tingkat kesegaran data yang diinginkan dan query SQL yang akan digunakan. Mesin Flink akan secara otomatis menurunkan skema untuk tabel materialized dan membuat pipeline pembaruan data yang sesuai untuk mencapai tingkat kesegaran yang ditentukan. Untuk informasi lebih lanjut, lihat Mengelola Tabel Materialized.

Pipeline data lakehouse real-time

  1. Di lapisan Operational Data Store (ODS), Flink mengambil data dari sumber data ke Paimon.

  2. Di lapisan Data Warehouse Detail (DWD), Flink menggabungkan tabel di lapisan ODS dan membuat tabel materialized dari data yang telah digabungkan.

  3. Di lapisan Data Warehouse Service (DWS), Flink membuat beberapa tabel materialized dari data di lapisan DWD, masing-masing memenuhi persyaratan kesegaran tertentu yang ditentukan oleh pengguna. Tabel materialized ini melayani berbagai kasus penggunaan bisnis dan merespons permintaan eksternal.

Prasyarat

  • Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.

  • Pengguna RAM atau Peran RAM memiliki izin yang diperlukan untuk mengakses konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Manajemen Izin.

Langkah 1: Persiapkan data uji

  1. Buat katalog Paimon.

    Fitur tabel materialized bergantung pada Apache Paimon sebagai sistem penyimpanan. Oleh karena itu, Anda harus membuat katalog Apache Paimon dengan tipe metastore filesystem. Jika Anda sudah memiliki katalog Paimon yang ada, Anda dapat melewati langkah ini. Untuk informasi lebih lanjut, lihat Kelola Katalog Apache Paimon.

    Buat Katalog Apache Paimon

    1. Masuk ke Konsol Manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sebelah kiri, klik Catalogs. Pada halaman Daftar Katalog, klik Create Catalog. Di tab Katalog Bawaan pada wizard yang muncul, pilih Apache Paimon, lalu klik Next.

      imageParameter:

      Parameter

      Deskripsi

      Catatan

      metastore

      Jenis penyimpanan metadata.

      Dalam contoh ini, atur parameter metastore ke filesystem.

      nama katalog

      Nama katalog Apache Paimon.

      Masukkan nama kustom. Dalam contoh ini, paimon digunakan.

      warehouse

      Direktori gudang data yang ditentukan di OSS.

      Path harus dalam format oss://<bucket>/<object>. Deskripsi bidang:

      • <bucket>: nama Bucket OSS Anda.

      • <object>: path tempat data Anda disimpan.

      Anda dapat melihat nilai bidang bucket dan object di Konsol OSS.

      fs.oss.endpoint

      Titik akhir OSS.

      Jika OSS berada di wilayah yang sama dengan Realtime Compute for Apache Flink, gunakan titik akhir internal untuk wilayah tersebut. Jika tidak, gunakan titik akhir publik. Untuk informasi lebih lanjut, lihat Wilayah dan Titik Akhir.

      fs.oss.accessKeyId

      ID AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki izin baca dan tulis di OSS.

      Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey, lihat Buat Pasangan AccessKey. Untuk mencegah risiko keamanan, gunakan variabel daripada pasangan AccessKey dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola Variabel

      fs.oss.accessKeySecret

      Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM yang memiliki izin baca dan tulis di OSS.

  2. Buat tabel ods_user_log dan ods_dim_product di katalog Paimon.

    1. Masuk ke Konsol Manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi sebelah kiri, pilih Development > Scripts.

      Dalam contoh ini, tabel dibuat di database default dari katalog bernama paimon.
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10)
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            -- Atur jumlah bucket menjadi 4.
        'bucket-key' = 'item_id'   -- Tentukan kolom kunci bucket. Data dengan kunci bucket yang sama (item_id) dimasukkan ke dalam bucket yang sama.
      );
      
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. Klik Run di pojok kanan atas untuk mengeksekusi potongan kode dan membuat tabel.

    5. Di panel navigasi sebelah kiri, pilih Catalogs. Di panel Katalog, pilih katalog paimon, dan klik ikon Refresh untuk melihat tabel yang telah dibuat.

  3. Hasilkan data uji menggunakan Faker Connector dan muat data ke tabel Paimon.

    1. Di panel navigasi sebelah kiri, pilih Development > ETL.

    2. Di pojok kiri atas panel editor SQL, klik New. Di tab Skrip SQL dari dialog Kotak Draft Baru, pilih Blank Stream Draft dan klik Next. Klik Create.

    3. Salin potongan kode berikut ke editor SQL:

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,
        user_id INT,
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    -- Hasilkan angka acak antara 0 dan 1000.
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           -- Hasilkan nilai untuk bidang vtime dalam lima jam terakhir.
        'rows-per-second' = '3'   -- Hasilkan tiga rekaman per detik.
       );
        
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'   -- Hasilkan tiga rekaman per detik.
       );
      
      BEGIN STATEMENT SET; 
      
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      
      END; 
    4. Di pojok kanan atas panel editor SQL, klik Deploy.

    5. Di panel navigasi sebelah kiri, pilih O&M > Deployments. Pada halaman Deployments, cari deployment target Anda dan klik Start di kolom Actions. Di panel Start Job yang muncul, pilih Initial Mode, lalu klik Start.

  4. Kueri data uji.

    Di panel navigasi sebelah kiri, pilih Development > Scripts. Salin potongan kode berikut ke editor SQL dan klik Run di pojok kanan atas.

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

    image

Langkah 2: Buat tabel materialized

Pada langkah ini, Anda akan membangun lapisan DWD dari data lakehouse dengan menggunakan Flink untuk menggabungkan dua tabel dasar Paimon dan membuat tabel materialized bernama dwd_user_log_product dari baris yang digabungkan. Selanjutnya, Anda akan membuat tabel materialized tambahan dari tabel materialized dwd_user_log_product untuk berbagai aplikasi bisnis, sehingga membangun lapisan DWS.

  1. Buat tabel materialized dwd_user_log_product dan bangun lapisan DWD.

    1. Di panel navigasi sebelah kiri, klik Catalogs. Di panel Katalog, klik katalog Paimon target.

    2. Pilih database target ("default" dalam contoh ini) dan klik Create Materialized Table di panel kanan. Di panel samping, tempel potongan kode berikut, dan klik Create.

      -- Bangun lapisan DWD.
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      -- Segarkan data setiap jam.
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. Bangun lapisan DWS dengan membuat tabel materialized dari tabel materialized dwd_user_log_product.

    Sebagai contoh, potongan kode berikut membuat tabel materialized dws_overall, yang menggabungkan jumlah halaman dilihat harian dan jumlah pengunjung unik, dipecah berdasarkan setiap jam. Untuk instruksi tentang cara membuat tabel materialized, lihat sub-langkah sebelumnya.

    -- Agregasi jumlah halaman dilihat harian dan jumlah pengunjung unik.
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   -- Segarkan data setiap jam.
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

Langkah 3: Perbarui tabel materialized

Mulai memperbarui tabel materialized

Dalam contoh ini, kesegaran data diatur selama 1 jam. Setelah pekerjaan dimulai, data diperbarui di tabel materialized setidaknya satu jam lebih lambat daripada pembaruan di tabel dasar.

  1. Di panel navigasi sebelah kiri, klik Data Lineage dan temukan tabel materialized target.

    image

  2. Pilih tabel materialized target, dan klik Start di pojok kanan bawah halaman.

Isi ulang data

Pengisian ulang data melibatkan penggantian data streaming lama di partisi atau tabel dengan data historis untuk memperbaiki ketidakakuratan dalam hasil pemrosesan aliran. Dalam konteks pekerjaan batch, pengisian ulang data memungkinkan penyegaran data segera, bukan menunggu interval penyegaran yang telah ditentukan.

Untuk mengganti data lama atau menyegarkan data segera, ikuti proses ini: Di panel Garis Data, pilih kotak berlabel dwd_user_log_product dan klik Trigger Update di pojok kanan bawah. Di kotak dialog Pemicu Pembaruan, masukkan tanggal hari ini di bidang ds (misalnya, 20241216), dan centang kotak Update Range untuk menyebarkan pembaruan ke tabel materialized hilir. Klik Confirm. Di kotak dialog Konfirmasi Pembaruan berikutnya, klik OK.

image

Ubah kesegaran

Anda dapat menyesuaikan pengaturan kesegaran tabel materialized untuk menyegarkan data pada interval satu atau lebih hari, jam, menit, atau detik, sesuai kebutuhan.

Ubah pengaturan kesegaran tabel materialized dwd_user_log_product dan dws_overall. Pilih kotak yang mewakili tabel materialized target. Klik Edit Freshness di pojok kanan bawah halaman. Di kotak dialog Edit Kesegaran, pilih Menit dari daftar drop-down, dan masukkan nilai yang diinginkan.

image

Langkah 4: Kueri tabel materialized

Pratinjau data

Anda dapat melihat pratinjau 100 rekaman terbaru di tabel materialized.

  1. Di panel navigasi sebelah kiri, klik Data Lineage dan temukan tabel materialized target.

  2. Pilih tabel materialized target, dan klik Details di pojok kanan bawah halaman.

  3. Panel detail tabel materialized muncul. Di tab Data Preview, klik ikon Query di pojok kanan atas.

    image

Kueri data

Di panel navigasi sebelah kiri, pilih Development > Scripts. Salin potongan kode berikut ke editor SQL, lalu klik Run untuk mengkueri tabel materialized dws_overall.

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

image

Referensi