全部产品
Search
文档中心

Realtime Compute for Apache Flink:Menulis data ke dan menggunakan data dari tabel Paimon

更新时间:Jul 02, 2025

Topik ini menjelaskan cara melakukan operasi insert, update, overwrite, dan delete pada tabel Apache Paimon (Paimon) menggunakan konsol pengembangan Realtime Compute for Apache Flink. Topik ini juga mencakup cara menggunakan data dari tabel Paimon berdasarkan offset tertentu.

Prasyarat

Katalog Paimon dan tabel Paimon telah dibuat. Untuk informasi lebih lanjut, lihat Mengelola katalog Apache Paimon.

Batasan

Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.5 atau versi lebih baru yang mendukung tabel Paimon.

Menulis data ke tabel Paimon

Gunakan pernyataan CTAS atau CDAS untuk menyinkronkan perubahan data dan skema

Untuk informasi tentang cara menggunakan pernyataan CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS), lihat Mengelola katalog Apache Paimon.

Gunakan pernyataan INSERT INTO untuk menyisipkan atau memperbarui data

Gunakan pernyataan INSERT OVERWRITE untuk menimpa data

Penimpaan adalah proses mengganti data lama dengan data baru. Data lama dihapus dan tidak dapat diakses lagi. Anda dapat menggunakan pernyataan INSERT OVERWRITE untuk menimpa sebagian atau seluruh tabel Paimon. Dalam contoh berikut, tabel bernama my_table digunakan.

Catatan
  • Pernyataan INSERT OVERWRITE hanya berlaku untuk penyebaran batch.

  • Secara default, perubahan yang dihasilkan dari pernyataan INSERT OVERWRITE tidak dapat dikonsumsi oleh operator hilir dalam mode streaming. Untuk informasi tentang cara mengonsumsi perubahan dalam mode streaming, lihat Mengonsumsi hasil pernyataan INSERT OVERWRITE.

  • Timpa semua data dalam tabel yang tidak dipartisi.

    INSERT OVERWRITE my_table SELECT ...;
  • Timpa partisi tabel, seperti partisi dt=20240108,hh=06.

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • Timpa beberapa partisi tabel. Tentukan partisi yang ingin Anda timpa dalam pernyataan SELECT.

    INSERT OVERWRITE my_table SELECT ...;
  • Timpa semua data dalam tabel yang dipartisi.

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

Gunakan pernyataan DELETE untuk menghapus data

Anda dapat menggunakan pernyataan DELETE untuk menghapus data dari tabel kunci utama. Anda harus mengeksekusi pernyataan DELETE dalam skrip.

-- Hapus semua catatan data yang bidang mata uangnya adalah 'UNKNOWN' dari tabel my_table.
DELETE FROM my_table WHERE currency = 'UNKNOWN';

Abaikan perubahan DELETE

Jika tabel kunci utama menerima perubahan DELETE, data yang sesuai dihapus secara default. Untuk mengabaikan perubahan DELETE dan mencegah penghapusan data, gunakan petunjuk SQL untuk mengatur parameter ignore-delete menjadi true.

Parameter

Deskripsi

Tipe Data

Nilai default

ignore-delete

Menentukan apakah akan mengabaikan perubahan DELETE dari upstream.

Boolean

false

Atur paralelisme sink

Anda dapat menggunakan petunjuk SQL untuk secara manual mengubah paralelisme operator sink. Tabel berikut menjelaskan parameter tersebut.

Parameter

Deskripsi

Tipe Data

Nilai default

sink.parallelism

Paralelisme operator sink.

Integer

N/A

Dalam contoh berikut, paralelisme operator sink diatur menjadi 10.

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

Menggunakan data dari tabel Paimon

Gunakan penyebaran streaming

Catatan

Sebelum menggunakan penyebaran streaming untuk menggunakan data dari tabel kunci utama, lengkapi konfigurasi produsen changelog.

Secara default, operator sumber menghasilkan semua data dalam tabel Paimon pada waktu mulai penyebaran dan terus menghasilkan data tambahan dalam tabel Paimon.

Konfigurasikan offset konsumen

Untuk menggunakan data dari tabel Paimon berdasarkan offset tertentu, gunakan metode berikut:

  • Jika Anda ingin menggunakan hanya data tambahan setelah waktu mulai penyebaran, gunakan petunjuk SQL untuk menentukan 'scan.mode' = 'latest'.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • Jika Anda ingin menggunakan hanya data tambahan setelah titik waktu tertentu, gunakan petunjuk SQL untuk menentukan parameter scan.timestamp-millis. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak waktu epoch UTC 1970-01-01 00:00:00.

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • Jika Anda ingin menggunakan semua data yang ditulis dari titik waktu tertentu dan data tambahan berikutnya, gunakan metode berikut:

    Catatan

    File data yang dibaca oleh operator sumber mungkin berisi sejumlah kecil data yang ditulis sebelum titik waktu tertentu karena Paimon menggabungkan file kecil untuk konsumsi hilir. Anda dapat menggunakan kondisi WHERE dalam pernyataan SQL untuk memfilter data berdasarkan kebutuhan bisnis Anda.

    • Konfigurasikan parameter Tentukan waktu mulai sumber saat Anda memulai penyebaran di GUI. image.png

    • Gunakan petunjuk SQL untuk menentukan parameter scan.file-creation-time-millis.

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • Jika Anda ingin menggunakan hanya data tambahan mulai dari file snapshot tertentu, gunakan petunjuk SQL untuk menentukan parameter scan.snapshot-id. Parameter ini menentukan ID file snapshot yang ingin Anda gunakan.

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • Jika Anda ingin menggunakan semua data yang terkandung dalam file snapshot tertentu dan data tambahan berikutnya, gunakan petunjuk SQL untuk menyertakan konfigurasi 'scan.mode' = 'from-snapshot-full' dan tentukan parameter scan.snapshot-id. Parameter scan.snapshot-id menentukan ID file snapshot yang ingin Anda gunakan.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

Tentukan ID konsumen

ID konsumen digunakan untuk mencatat offset konsumen dalam tabel Paimon.

  • Jika Anda memodifikasi logika komputasi dalam pernyataan SQL, topologi penyebaran mungkin berubah dan offset konsumen mungkin gagal dipulihkan dari status yang disimpan oleh Realtime Compute for Apache Flink. Dalam hal ini, Anda dapat menentukan ID konsumen untuk menyimpan offset konsumen dalam file metadata tabel Paimon. Dengan cara ini, konsumen dapat melanjutkan dari offset sebelumnya meskipun penyebaran dimulai ulang secara stateless.

  • Jika file snapshot yang kedaluwarsa dihapus sebelum dikonsumsi, kesalahan terjadi. Anda dapat menentukan ID konsumen untuk mencegah penghapusan file snapshot yang kedaluwarsa yang belum dikonsumsi.

Untuk menentukan ID konsumen untuk operator sumber, atur parameter consumer-id ke string. Pertama kali Anda menentukan ID konsumen, offset konsumen ditentukan seperti yang dijelaskan dalam Konfigurasikan offset konsumen. Saat Anda menentukan ID konsumen yang sama dalam operasi berikutnya, konsumen dapat melanjutkan dari offset konsumen yang tersimpan dari tabel Paimon.

Sebagai contoh, Anda dapat menggunakan pernyataan SQL berikut untuk menentukan ID konsumen bernama test-id untuk operator sumber. Jika Anda ingin mengatur ulang offset konsumen dari ID konsumen, tentukan 'consumer.ignore-progress' = 'true'.

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
Catatan

Saat menggunakan ID konsumen, file snapshot dan file data historis yang sesuai tidak dihapus setelah kedaluwarsa. Ini dapat mengakibatkan pemborosan ruang penyimpanan. Untuk menyelesaikan masalah di atas, Anda dapat menentukan parameter consumer.expiration-time untuk menghapus ID konsumen yang tidak aktif. Sebagai contoh, 'consumer.expiration-time' = '3d' menentukan bahwa ID konsumen yang tidak digunakan selama tiga hari berturut-turut dihapus.

Mengonsumsi hasil pernyataan INSERT OVERWRITE

Secara default, perubahan yang dihasilkan dari pernyataan INSERT OVERWRITE tidak dikonsumsi oleh operator hilir dalam mode streaming. Jika Anda ingin mengonsumsi perubahan tersebut, gunakan petunjuk SQL untuk menentukan 'streaming-read-overwrite' = 'true'.

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

Gunakan penyebaran batch

Secara default, operator sumber penyebaran batch membaca file snapshot terbaru dari tabel Paimon untuk menghasilkan data status terbaru.

Batch Time Travel

Untuk menanyakan status tabel Paimon pada titik waktu tertentu, gunakan petunjuk SQL untuk menentukan parameter scan.timestamp-millis. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak waktu epoch UTC 1970-01-01 00:00:00.

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

Untuk menanyakan status tabel Paimon saat file snapshot tertentu dihasilkan, gunakan petunjuk SQL untuk menentukan parameter scan.snapshot-id. Parameter ini menentukan ID file snapshot yang ingin Anda gunakan.

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

Tanyakan perubahan tambahan antara dua snapshot

Untuk menanyakan perubahan tambahan ke tabel Paimon antara dua snapshot tertentu, gunakan petunjuk SQL untuk menentukan parameter incremental-between. Sebagai contoh, Anda dapat mengeksekusi pernyataan SQL berikut untuk menanyakan data yang berubah antara snapshot 20 dan snapshot 12.

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
Catatan

Secara default, perubahan DELETE dibuang dalam penyebaran batch. Jika Anda ingin mengonsumsi perubahan DELETE dalam penyebaran batch, tanyakan tabel log audit yang disediakan oleh Paimon. Contoh: SELECT * FROM 't$audit_log ' /*+ OPTIONS('incremental-between' = '12,20') */;.

Atur paralelisme sumber

Secara default, Paimon secara otomatis menurunkan paralelisme operator sumber berdasarkan informasi seperti jumlah partisi dan bucket. Anda dapat menggunakan petunjuk SQL untuk secara manual mengubah paralelisme operator sumber.

Parameter

Tipe Data

Nilai default

Catatan

scan.parallelism

Integer

N/A

Paralelisme operator sumber.

scan.infer-parallelism

Boolean

true

Menentukan apakah akan secara otomatis menurunkan paralelisme operator sumber.

scan.infer-parallelism.max

Integer

1024

Paralelisme maksimum yang diturunkan oleh operator sumber.

Dalam contoh berikut, paralelisme operator sumber diatur menjadi 10.

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

Fungsi sebagai tabel dimensi

Anda dapat menggunakan tabel Paimon sebagai tabel dimensi. Untuk informasi tentang cara menulis klausa JOIN, lihat Sintaks.

Mengonsumsi dan menulis data VARIANT

Dalam VVR 11.1 atau versi lebih baru, tabel Paimon mendukung tipe semi-struktur VARIANT. Anda dapat menggunakan PARSE_JSON atau TRY_PARSE_JSON untuk mengonversi data VARCHAR berformat JSON menjadi VARIANT. Tipe VARIANT secara signifikan meningkatkan kinerja penanyakan dan pemrosesan data berformat JSON.

Contoh kode:

CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
  k BIGINT,
  info VARIANT
);

INSERT INTO `my-catalog`.`my_db`.`my_tbl` 
SELECT k, PARSE_JSON(jsonStr) FROM T;

Referensi