Topik ini menjelaskan cara menggunakan tabel materialized Flink untuk membangun data lakehouse terintegrasi aliran-batch. Topik ini juga mencakup cara menyesuaikan kesegaran tabel materialized untuk beralih dari mode eksekusi batch ke mode streaming, yang memungkinkan pembaruan data secara real-time.
Pengenalan tabel materialized
Tabel materialized adalah tipe tabel baru dalam Flink SQL yang bertujuan menyederhanakan pipeline data batch dan streaming untuk memberikan pengalaman pengembangan yang konsisten. Saat membuat tabel materialized, Anda tidak perlu mendeklarasikan bidang dan tipe. Sebagai gantinya, cukup tentukan tingkat kesegaran data yang diinginkan dan query SQL yang akan digunakan. Mesin Flink akan secara otomatis menurunkan skema untuk tabel materialized dan membuat pipeline pembaruan data yang sesuai untuk mencapai tingkat kesegaran yang ditentukan. Untuk informasi lebih lanjut, lihat Mengelola Tabel Materialized.
Pipeline data lakehouse real-time
Di lapisan Operational Data Store (ODS), Flink mengambil data dari sumber data ke Paimon.
Di lapisan Data Warehouse Detail (DWD), Flink menggabungkan tabel di lapisan ODS dan membuat tabel materialized dari data yang telah digabungkan.
Di lapisan Data Warehouse Service (DWS), Flink membuat beberapa tabel materialized dari data di lapisan DWD, masing-masing memenuhi persyaratan kesegaran tertentu yang ditentukan oleh pengguna. Tabel materialized ini melayani berbagai kasus penggunaan bisnis dan merespons permintaan eksternal.
Prasyarat
Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Pengguna RAM atau Peran RAM memiliki izin yang diperlukan untuk mengakses konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Manajemen Izin.
Langkah 1: Persiapkan data uji
Buat katalog Paimon.
Fitur tabel materialized bergantung pada Apache Paimon sebagai sistem penyimpanan. Oleh karena itu, Anda harus membuat katalog Apache Paimon dengan tipe metastore filesystem. Jika Anda sudah memiliki katalog Paimon yang ada, Anda dapat melewati langkah ini. Untuk informasi lebih lanjut, lihat Kelola Katalog Apache Paimon.
Buat tabel ods_user_log dan ods_dim_product di katalog Paimon.
Masuk ke Konsol Manajemen Realtime Compute for Apache Flink.
Temukan ruang kerja target dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, pilih .
Dalam contoh ini, tabel dibuat di database default dari katalog bernama paimon.
CREATE TABLE `paimon`.`default`.`ods_user_log` ( item_id INT NOT NULL, user_id INT NOT NULL, vtime TIMESTAMP(6), ds VARCHAR(10) ) PARTITIONED BY(ds) WITH ( 'bucket' = '4', -- Atur jumlah bucket menjadi 4. 'bucket-key' = 'item_id' -- Tentukan kolom kunci bucket. Data dengan kunci bucket yang sama (item_id) dimasukkan ke dalam bucket yang sama. ); CREATE TABLE `paimon`.`default`.`ods_dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'bucket' = '4', 'bucket-key' = 'item_id' );Klik Run di pojok kanan atas untuk mengeksekusi potongan kode dan membuat tabel.
Di panel navigasi sebelah kiri, pilih Catalogs. Di panel Katalog, pilih katalog paimon, dan klik ikon Refresh untuk melihat tabel yang telah dibuat.
Hasilkan data uji menggunakan Faker Connector dan muat data ke tabel Paimon.
Di panel navigasi sebelah kiri, pilih .
Di pojok kiri atas panel editor SQL, klik New. Di tab Skrip SQL dari dialog Kotak Draft Baru, pilih Blank Stream Draft dan klik Next. Klik Create.
Salin potongan kode berikut ke editor SQL:
CREATE TEMPORARY TABLE `user_log` ( item_id INT, user_id INT, vtime TIMESTAMP, ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd') ) WITH ( 'connector' = 'faker', 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', -- Hasilkan angka acak antara 0 dan 1000. 'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}', 'fields.vtime.expression'='#{date.past ''5'',''HOURS''}', -- Hasilkan nilai untuk bidang vtime dalam lima jam terakhir. 'rows-per-second' = '3' -- Hasilkan tiga rekaman per detik. ); CREATE TEMPORARY TABLE `dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'connector' = 'faker', 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', 'fields.title.expression'='#{book.title}', 'fields.pict_url.expression'='#{internet.domainName}', 'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'rows-per-second' = '3' -- Hasilkan tiga rekaman per detik. ); BEGIN STATEMENT SET; INSERT INTO `paimon`.`default`.`ods_user_log` SELECT item_id, user_id, vtime, CAST(ds AS VARCHAR(10)) AS ds FROM `user_log`; INSERT INTO `paimon`.`default`.`ods_dim_product` SELECT item_id, title, pict_url, brand_id, seller_id FROM `dim_product`; END;Di pojok kanan atas panel editor SQL, klik Deploy.
Di panel navigasi sebelah kiri, pilih . Pada halaman Deployments, cari deployment target Anda dan klik Start di kolom Actions. Di panel Start Job yang muncul, pilih Initial Mode, lalu klik Start.
Kueri data uji.
Di panel navigasi sebelah kiri, pilih . Salin potongan kode berikut ke editor SQL dan klik Run di pojok kanan atas.
SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10; SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;
Langkah 2: Buat tabel materialized
Pada langkah ini, Anda akan membangun lapisan DWD dari data lakehouse dengan menggunakan Flink untuk menggabungkan dua tabel dasar Paimon dan membuat tabel materialized bernama dwd_user_log_product dari baris yang digabungkan. Selanjutnya, Anda akan membuat tabel materialized tambahan dari tabel materialized dwd_user_log_product untuk berbagai aplikasi bisnis, sehingga membangun lapisan DWS.
Buat tabel materialized dwd_user_log_product dan bangun lapisan DWD.
Di panel navigasi sebelah kiri, klik Catalogs. Di panel Katalog, klik katalog Paimon target.
Pilih database target ("default" dalam contoh ini) dan klik Create Materialized Table di panel kanan. Di panel samping, tempel potongan kode berikut, dan klik Create.
-- Bangun lapisan DWD. CREATE MATERIALIZED TABLE dwd_user_log_product( PRIMARY KEY (item_id) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Segarkan data setiap jam. AS SELECT l.ds, l.item_id, l.user_id, l.vtime, r.brand_id, r.seller_id FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r ON l.item_id = r.item_id;
Bangun lapisan DWS dengan membuat tabel materialized dari tabel materialized dwd_user_log_product.
Sebagai contoh, potongan kode berikut membuat tabel materialized dws_overall, yang menggabungkan jumlah halaman dilihat harian dan jumlah pengunjung unik, dipecah berdasarkan setiap jam. Untuk instruksi tentang cara membuat tabel materialized, lihat sub-langkah sebelumnya.
-- Agregasi jumlah halaman dilihat harian dan jumlah pengunjung unik. CREATE MATERIALIZED TABLE dws_overall( PRIMARY KEY(ds, hh) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Segarkan data setiap jam. AS SELECT ds, COALESCE(hh, 'day') AS hh, count(*) AS pv, count(distinct user_id) AS uv FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id FROM `paimon`.`default`.`dwd_user_log_product`) tmp GROUP BY GROUPING SETS(ds, (ds, hh));
Langkah 3: Perbarui tabel materialized
Mulai memperbarui tabel materialized
Dalam contoh ini, kesegaran data diatur selama 1 jam. Setelah pekerjaan dimulai, data diperbarui di tabel materialized setidaknya satu jam lebih lambat daripada pembaruan di tabel dasar.
Di panel navigasi sebelah kiri, klik Data Lineage dan temukan tabel materialized target.

Pilih tabel materialized target, dan klik Start di pojok kanan bawah halaman.
Isi ulang data
Pengisian ulang data melibatkan penggantian data streaming lama di partisi atau tabel dengan data historis untuk memperbaiki ketidakakuratan dalam hasil pemrosesan aliran. Dalam konteks pekerjaan batch, pengisian ulang data memungkinkan penyegaran data segera, bukan menunggu interval penyegaran yang telah ditentukan.
Untuk mengganti data lama atau menyegarkan data segera, ikuti proses ini: Di panel Garis Data, pilih kotak berlabel dwd_user_log_product dan klik Trigger Update di pojok kanan bawah. Di kotak dialog Pemicu Pembaruan, masukkan tanggal hari ini di bidang ds (misalnya, 20241216), dan centang kotak Update Range untuk menyebarkan pembaruan ke tabel materialized hilir. Klik Confirm. Di kotak dialog Konfirmasi Pembaruan berikutnya, klik OK.

Ubah kesegaran
Anda dapat menyesuaikan pengaturan kesegaran tabel materialized untuk menyegarkan data pada interval satu atau lebih hari, jam, menit, atau detik, sesuai kebutuhan.
Ubah pengaturan kesegaran tabel materialized dwd_user_log_product dan dws_overall. Pilih kotak yang mewakili tabel materialized target. Klik Edit Freshness di pojok kanan bawah halaman. Di kotak dialog Edit Kesegaran, pilih Menit dari daftar drop-down, dan masukkan nilai yang diinginkan.

Langkah 4: Kueri tabel materialized
Pratinjau data
Anda dapat melihat pratinjau 100 rekaman terbaru di tabel materialized.
Di panel navigasi sebelah kiri, klik Data Lineage dan temukan tabel materialized target.
Pilih tabel materialized target, dan klik Details di pojok kanan bawah halaman.
Panel detail tabel materialized muncul. Di tab Data Preview, klik ikon Query di pojok kanan atas.

Kueri data
Di panel navigasi sebelah kiri, pilih . Salin potongan kode berikut ke editor SQL, lalu klik Run untuk mengkueri tabel materialized dws_overall.
SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;
Parameter: