全部产品
Search
文档中心

Realtime Compute for Apache Flink:Buat dan gunakan tabel materialized

更新时间:Jan 05, 2026

Topik ini menjelaskan cara membuat tabel materialized, melakukan pengisian ulang data historis, mengubah kesegaran data, serta melihat alur data dari tabel materialized tersebut.

Batasan

  • Fitur ini hanya didukung di Ververica Runtime (VVR) 8.0.10 dan versi yang lebih baru.

  • Fitur ini hanya mendukung Paimon Catalogs yang menggunakan metastore Filesystem atau DLF. Anda tidak dapat menggunakan Paimon Catalogs kustom untuk membuat tabel materialized.

  • Anda harus memiliki izin untuk mengembangkan dan menyebarkan pekerjaan. Untuk informasi selengkapnya, lihat Otorisasi Konsol pengembangan.

  • Referensi ke objek temporary, seperti tabel temporary, fungsi temporary, dan tampilan temporary, tidak didukung.

Buat tabel materialized

Sintaksis

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Primary key constraint
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]

[COMMENT table_comment]
-- Partition key
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- WITH options
[WITH (key1=val1, key2=val2, ...)]
-- Data freshness
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Refresh mode
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS  <select_statement>

Parameter

Parameter

Diperlukan

Deskripsi

FRESHNESS

Ya

Menentukan kesegaran data dari tabel materialized. Ini mendefinisikan keterlambatan maksimum untuk pembaruan data dari tabel sumber.

Catatan
  • Jika tabel dasar adalah tabel materialized, pastikan kesegaran data dari tabel downstream adalah 1 hingga N kali lipat dari tabel upstream, dengan N sebagai bilangan bulat positif.

  • Kesegaran data tidak boleh melebihi satu hari.

AS <select_statement>

Ya

Mendefinisikan kueri yang mengisi tabel materialized. Tabel upstream dapat berupa tabel materialized, tabel biasa, atau tampilan. Pernyataan SELECT mendukung semua kueri Flink SQL.

PRIMARY KEY

Tidak

Mendefinisikan kumpulan kolom opsional yang secara unik mengidentifikasi setiap baris dalam tabel. Kolom yang ditentukan tidak boleh bernilai null.

PARTITIONED BY

Tidak

Mendefinisikan kumpulan kolom opsional yang digunakan untuk mempartisi tabel materialized.

WITH Options

Tidak

Mendefinisikan properti tabel dan parameter format waktu untuk bidang partisi yang diperlukan untuk membuat tabel materialized.

Sebagai contoh, parameter format waktu untuk bidang partisi adalah WITH ('partition.fields.#.date-formatter' = 'yyyyMMdd'). Untuk informasi selengkapnya tentang penggunaan parameter, lihat contoh berikut.

REFRESH_MODE

Tidak

Menentukan mode refresh dari tabel materialized. Mode refresh yang ditentukan memiliki prioritas lebih tinggi daripada mode yang secara otomatis disimpulkan oleh framework dari kesegaran data. Hal ini memungkinkan Anda memenuhi persyaratan skenario tertentu.

  • CONTINUOUS: Pekerjaan aliran memperbarui tabel materialized secara inkremental. Data downstream terlihat segera atau setelah checkpoint selesai.

  • FULL: Alur kerja secara berkala memicu pembaruan tabel materialized. Mesin menentukan apakah akan melakukan pembaruan penuh atau inkremental. Untuk informasi selengkapnya, lihat Pembaruan inkremental tabel materialized. Siklus refresh data sesuai dengan kesegaran data. Secara default, pembaruan menimpa data pada level tabel. Jika ada bidang partisi, Anda dapat memilih untuk merefresh hanya partisi terbaru atau semua partisi.

Prosedur

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Pada kolom Actions ruang kerja target, klik Console.

  3. Di panel navigasi sebelah kiri, pilih Data Management dan klik Paimon Catalog target.

  4. Klik database target, lalu klik Create Materialized Table.

    Asumsikan Anda memiliki tabel dasar bernama `orders` dengan primary key `order_id`, nama kategori `order_name`, dan bidang tanggal `ds`. Contoh berikut menunjukkan cara membuat tabel materialized berdasarkan tabel ini:

    • Buat tabel materialized bernama `mt_order` berdasarkan tabel `orders`. Tabel baru mencakup semua bidang dari tabel sumber dan memiliki kesegaran data 5 detik.

      CREATE MATERIALIZED TABLE mt_order
      FRESHNESS = INTERVAL '5' SECOND
      AS
      SELECT * FROM `paimon`.`db`.`orders`
      ;
    • Buat tabel materialized bernama `mt_id` berdasarkan tabel `mt_order`. Tabel baru mencakup bidang `order_id` dan `ds`. Tetapkan `order_id` sebagai primary key, `ds` sebagai bidang partisi, dan kesegaran data menjadi 30 menit.

      CREATE MATERIALIZED TABLE mt_id (
       PRIMARY KEY (order_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      FRESHNESS = INTERVAL '30' MINUTE
      AS
      SELECT order_id,ds FROM mt_order
      ;
    • Buat tabel materialized bernama `mt_ds` berdasarkan tabel `mt_order`. Tentukan date-formatter (format waktu) untuk bidang partisi ds. Setiap kali pekerjaan dijadwalkan, sistem mengurangi nilai kesegaran dari waktu terjadwal dan mengonversi hasilnya ke nilai partisi ds yang sesuai. Sebagai contoh, jika kesegaran data adalah 1 jam dan waktu terjadwal adalah 2024-01-01 00:00:00, nilai `ds` yang dihitung adalah `20231231`. Hanya data dalam partisi ds = '20231231' yang direfresh. Jika waktu terjadwal adalah 2024-01-01 01:00:00, nilai `ds` yang dihitung adalah `20240101`, dan data dalam partisi ds = '20240101' direfresh.

      CREATE MATERIALIZED TABLE mt_ds
      PARTITIONED BY(ds)
      WITH (
          'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR
      AS
      SELECT order_id,order_name,ds FROM mt_order
      ;
      Catatan
      • Dalam partition.fields.#.date-formatter, `#` harus merupakan bidang partisi yang valid bertipe string.

      • Parameter partition.fields.#.date-formatter menentukan format partisi waktu untuk tabel materialized. `#` merepresentasikan nama bidang partisi bertipe string. Parameter ini memberi tahu sistem partisi mana yang harus direfresh.

  5. Mulai atau hentikan pembaruan tabel materialized.

    1. Klik tabel materialized target di bawah katalog.

    2. Di pojok kanan atas, klik Start Update atau Stop Update.

      Catatan

      Jika pembaruan sedang berlangsung saat Anda mengklik Stop Update, proses akan berhenti setelah siklus pembaruan saat ini selesai.

  6. Lihat detail pekerjaan tabel materialized.

    Pada tab Table Schema Details, lihat bagian Basic Information. Klik ID pekerjaan di samping Data Update Job atau Workflow untuk melihat detailnya.

Ubah kueri tabel materialized

Batasan

  • Anda hanya dapat mengubah kueri untuk tabel materialized yang dibuat di Ververica Runtime (VVR) 11.1 atau versi yang lebih baru.

  • Saat mengubah kueri, Anda hanya dapat menambahkan kolom dan mengubah logika perhitungan. Anda tidak dapat mengubah urutan kolom yang sudah ada atau mengubah definisinya.

    Jenis Operasi

    Dukungan

    Deskripsi

    Tambahkan kolom baru

    Dukungan

    Anda dapat menambahkan kolom baru di akhir skema sambil mempertahankan urutan kolom yang sudah ada.

    Ubah logika perhitungan kolom yang sudah ada (tanpa mengubah nama dan tipe kolom)

    Didukung

    Sebagai contoh, Anda dapat mengubah logika perhitungan, tetapi nama kolom dan tipe data harus tetap sama.

    Ubah urutan kolom yang sudah ada

    Tidak

    Urutan kolom bersifat tetap. Anda harus menghapus dan membuat ulang tabel materialized.

    Ubah nama atau tipe data kolom yang sudah ada

    Tidak

    Anda harus menghapus dan membuat ulang tabel materialized.

Contoh modifikasi

  1. Klik Edit Table dan ubah kueri. Kode berikut memberikan contohnya:

    ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders`
        AS
        SELECT
          *,
          price * quantity AS total_price
        FROM orders
        WHERE price * quantity > 1000
    ;
  2. Klik Preview untuk melihat perubahan.

    image

  3. Klik Confirm. Anda kemudian dapat melihat kolom dan logika kueri baru pada tab Table Schema Details.

Penting

Menambahkan bidang biasanya tidak memengaruhi node downstream. Namun, jika node downstream menggunakan parsing dinamis, seperti SELECT * atau pemetaan bidang otomatis, tugas sinkronisasi mungkin gagal atau melaporkan error ketidaksesuaian format data. Kami menyarankan agar Anda menghindari parsing dinamis dan menggunakan bidang tetap. Saat tabel upstream berubah, segera perbarui skema tabel downstream.

Pembaruan inkremental tabel materialized

Batasan

Fitur ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi yang lebih baru.

Mode pembaruan tabel materialized

Tabel materialized memiliki tiga mode pembaruan: pembaruan aliran, pembaruan batch penuh, dan pembaruan batch inkremental.

Mode aliran atau batch dari tabel materialized ditentukan oleh kesegaran datanya. Kesegaran kurang dari 30 menit menunjukkan mode aliran, sedangkan 30 menit atau lebih menunjukkan mode batch. Dalam mode batch, mesin secara otomatis menentukan apakah akan melakukan pembaruan penuh atau inkremental. Pembaruan inkremental hanya menghitung data yang berubah sejak pembaruan terakhir dan menggabungkannya ke dalam tabel materialized. Pembaruan penuh menghitung data untuk seluruh tabel atau partisi dan menimpa data di tabel materialized. Dalam mode batch, mesin memprioritaskan pembaruan inkremental dan hanya menggunakan pembaruan penuh jika pembaruan inkremental tidak didukung untuk tabel materialized tersebut.

Kondisi untuk pembaruan bertahap

Pembaruan inkremental hanya digunakan jika tabel materialized memenuhi kondisi berikut:

  • Parameter partition.fields.#.date-formatter tidak dikonfigurasi untuk bidang partisi apa pun dalam definisi tabel materialized.

  • Tabel sumber tidak memiliki primary key.

  • Kueri dalam tabel materialized mendukung pembaruan inkremental dalam kasus berikut:

    Pernyataan SQL

    Dukungan

    SELECT

    Mendukung pemilihan kolom dan ekspresi fungsi skalar, termasuk user-defined functions (UDFs). Fungsi agregat tidak didukung.

    FROM

    Mendukung nama tabel atau subkueri.

    WITH

    Mendukung common table expressions (CTEs).

    WHERE

    Mendukung kondisi filter yang mencakup berbagai ekspresi fungsi skalar, termasuk UDFs. Subkueri, seperti `WHERE [NOT] EXISTS <subquery>` dan `WHERE <column_name> [NOT] IN <subquery>`, tidak didukung.

    UNION

    Hanya UNION ALL yang didukung.

    JOIN

    • INNER JOIN didukung.

    • LEFT/RIGHT/FULL [OUTER] JOIN tidak didukung, kecuali untuk kasus LATERAL JOIN dan Lookup Join yang dijelaskan di bawah.

    • [LEFT [OUTER]] JOIN LATERAL dengan ekspresi fungsi tabel, termasuk user-defined table-valued functions (UDTFs), didukung.

    • Untuk Lookup Join, hanya `A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME()` yang didukung.

    Catatan
    • JOIN tanpa kata kunci `JOIN` didukung, seperti `SELECT * FROM a, b WHERE a.id = b.id`.

    • Saat ini, perhitungan inkremental dengan `INNER JOIN` tetap membaca seluruh data dari kedua tabel sumber.

    GROUP BY

    Tidak didukung.

Contoh pembaruan inkremental

Contoh 1: Proses data di tabel sumber `orders` menggunakan fungsi skalar.

CREATE MATERIALIZED TABLE mt_shipped_orders (
    PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT 
    order_id,
    COALESCE(customer_id, 'Unknown') AS customer_id,
    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
    CASE 
        WHEN status = 'shipped' THEN 'Completed'
        WHEN status = 'pending' THEN 'In Progress'
        ELSE 'Unknown'
    END AS order_status,
    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
    UDSF_ProcessFunction(notes) AS notes
FROM 
    orders
WHERE
    status = 'shipped';

Contoh 2: Perkaya data di tabel sumber `orders` menggunakan LATERAL JOIN dan Lookup Join.

CREATE MATERIALIZED TABLE mt_enriched_orders (
    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
    SELECT
        order_id,
        product_id,
        quantity,
        proc_time,
        e.tag AS order_tag
    FROM 
        orders,
        LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT 
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price,
    o.quantity * p.price AS total_amount,
    order_tag
FROM o 
LEFT JOIN 
    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON 
    o.product_id = p.product_id;

Isi ulang data historis

Sebelumnya, setelah menjalankan pekerjaan aliran, Anda harus mengembangkan pekerjaan batch terpisah untuk merekonsiliasi hasil dengan dataset lengkap dari hari sebelumnya. Dengan tabel materialized, Anda dapat langsung memilih partisi data historis untuk mengisi ulang data. Perubahan ini mengurangi biaya pengembangan dan O&M serta menyediakan pemrosesan aliran dan batch terpadu.

  1. Klik tabel materialized target di bawah katalog.

  2. Pada tab Data Information, lakukan pengisian ulang data.

    Jika Anda mendeklarasikan bidang partisi saat membuat tabel materialized, maka itu adalah tabel partisi. Jika tidak, itu adalah tabel non-partisi.

    Tabel partisi

    Lihat Data Partitions. Jika ini adalah pengisian ulang pertama atau partisi yang diperlukan belum ada, klik Manual Update. Jika partisi sudah ada, pilih partisi yang akan diisi ulang dan klik Backfill.

    image

    image

    Parameter:

    • Bidang Partisi: Bidang partisi dari tabel. Misalnya, jika Anda memasukkan `20241201`, semua data dengan `ds=20241201` akan diisi ulang.

    • Nama Tugas: Nama tugas pengisian ulang data.

    • Lingkup Pembaruan (Opsional): Menentukan apakah pembaruan harus dikaskade ke tabel materialized downstream terkait. Pembaruan dikaskade dari tabel saat ini melalui semua tabel materialized dalam alur datanya. Jumlah maksimum lapisan downstream adalah 6.

      Catatan
      • Untuk pembaruan tabel partisi, bidang partisi dari tabel materialized downstream harus identik dengan tabel awal. Jika tidak, operasi pembaruan gagal.

      • Jika tabel materialized dalam alur data gagal diperbarui, semua node downstream juga gagal.

    • Target Penyebaran: Anda dapat memilih antrian atau kluster session. Nilai default-nya adalah `default-queue`.

    Tabel non-partisi

    Lihat Data Status dan klik Backfill.

    image

    Parameter:

    • Nama Tugas: Nama tugas pengisian ulang data.

    • Lingkup Pembaruan: Parameter ini tidak tersedia untuk tabel non-partisi.

      Catatan
      • Selama pembaruan, dilakukan pembaruan penuh pada data downstream.

      • Jika tabel materialized dalam alur data gagal diperbarui, semua node downstream juga gagal.

      • Jika sistem menentukan bahwa tabel awal adalah tugas aliran non-partisi berdasarkan kesegaran datanya, pembaruan kaskade tidak didukung.

    • Target Penyebaran: Anda dapat memilih antrian atau kluster session. Nilai default-nya adalah `default-queue`.

  3. Pengisian ulang terjadwal dan batch.

    Anda dapat menggunakan Task Orchestration untuk membuat alur kerja tabel materialized guna jadwal berulang. Hal ini memungkinkan pengisian ulang terjadwal. Anda juga dapat menggunakan fitur pengisian ulang data dari alur kerja untuk memilih rentang waktu guna pengisian ulang data batch.

Ubah kesegaran data

  1. Di katalog, klik database materialized table, lalu klik materialized table target.

  2. Di pojok kanan atas, klik Modify Data Freshness.

    • Jika tabel materialized tidak memiliki primary key, Anda tidak dapat mengubah mode pemrosesan pekerjaan antara aliran dan batch. Misalnya, Anda tidak dapat mengubah kesegaran data dari 2 detik (aliran) menjadi 1 jam (batch), atau sebaliknya. Pekerjaan dikategorikan sebagai pekerjaan aliran jika kesegarannya kurang dari 30 menit, dan pekerjaan batch jika kesegarannya 30 menit atau lebih.

    • Jika tabel dasar adalah tabel materialized, kesegaran data dari tabel downstream harus merupakan kelipatan bilangan bulat dari kesegaran data tabel upstream.

    • Kesegaran data tidak boleh melebihi satu hari.

Lihat garis keturunan data

Halaman alur data menampilkan hubungan alur antara semua tabel materialized. Dari halaman ini, Anda juga dapat melakukan operasi seperti Start/Stop Update dan Modify Data Freshness pada tabel materialized. Untuk melihat halaman detail tabel materialized tertentu, klik Details.

image

Referensi