全部产品
Search
文档中心

Realtime Compute for Apache Flink:Menulis data ke dan menggunakan data dari tabel Paimon

更新时间:Jan 14, 2026

Topik ini menjelaskan cara menyisipkan, memperbarui, menimpa, atau menghapus data di tabel Paimon menggunakan Konsol pengembangan Realtime Compute for Apache Flink. Topik ini juga menjelaskan cara mengonsumsi data dari tabel Paimon dan menentukan consumer offset.

Prasyarat

Anda telah membuat Paimon Catalog dan tabel Paimon. Untuk informasi selengkapnya, lihat Mengelola Paimon Catalog.

Batasan

Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.5 atau versi lebih baru yang mendukung tabel Paimon.

Menulis data ke tabel Paimon

Menyinkronkan perubahan data dan skema menggunakan pernyataan CTAS atau CDAS

Untuk informasi selengkapnya, lihat Mengelola katalog Paimon.

Menyisipkan atau memperbarui data menggunakan pernyataan INSERT INTO

Anda dapat menggunakan pernyataan INSERT INTO untuk menyisipkan atau memperbarui data di tabel Paimon.

Menimpa data menggunakan pernyataan INSERT OVERWRITE

Menimpa data berarti menghapus dan menulis ulang data tersebut. Anda dapat menggunakan pernyataan INSERT OVERWRITE untuk menimpa seluruh tabel Paimon atau partisi tertentu. Contoh berikut menunjukkan pernyataan SQL-nya.

Catatan
  • Pernyataan INSERT OVERWRITE hanya didukung dalam batch job.

  • Secara default, operasi INSERT OVERWRITE tidak menghasilkan data changelog. Data yang dihapus dan diimpor tidak dapat dikonsumsi oleh streaming job downstream. Untuk mengonsumsi data jenis ini, lihat Streaming dan mengonsumsi hasil pernyataan INSERT OVERWRITE.

  • Timpa seluruh tabel non-partisi my_table.

    INSERT OVERWRITE my_table SELECT ...;
  • Timpa partisi dt=20240108,hh=06 di tabel my_table.

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • Timpa partisi secara dinamis di tabel my_table. Partisi yang muncul dalam hasil pernyataan SELECT akan ditimpa. Partisi lain tetap tidak berubah.

    INSERT OVERWRITE my_table SELECT ...;
  • Timpa seluruh tabel partisi my_table.

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

Menghapus data menggunakan pernyataan DELETE

Anda dapat menggunakan pernyataan DELETE untuk menghapus data dari tabel Paimon primary key. Pernyataan DELETE hanya dapat dieksekusi di Data Exploration.

-- Hapus semua data di mana currency = 'UNKNOWN' dari tabel my_table.
DELETE FROM my_table WHERE currency = 'UNKNOWN';

Memfilter pesan DELETE

Secara default, saat Anda menggunakan tabel Paimon primary key, pesan DELETE akan menghapus data yang memiliki primary key yang sesuai. Jika Anda tidak ingin tabel Paimon memproses pesan tersebut, Anda dapat menggunakan Petunjuk SQL untuk mengatur parameter berikut ke `true` agar memfilternya.

Parameter

Deskripsi

Tipe data

Nilai default

ignore-delete

Menentukan apakah pesan DELETE difilter.

Boolean

false

Menyesuaikan konkurensi tabel sink

Anda dapat menggunakan Petunjuk SQL untuk mengatur parameter berikut guna menyesuaikan konkurensi operator tabel sink secara manual.

Parameter

Deskripsi

Tipe data

Nilai default

sink.parallelism

Menetapkan konkurensi operator tabel sink Paimon secara manual.

Integer

None

Sebagai contoh, pernyataan SQL berikut menetapkan konkurensi operator tabel sink Paimon secara manual menjadi 10.

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

Menggunakan data dari tabel Paimon

Pekerjaan Streaming

Catatan

Untuk tabel Paimon primary key yang dikonsumsi oleh streaming job, Anda harus mengonfigurasi changelog producer.

Secara default, operator sumber Paimon dalam streaming job pertama kali menghasilkan data lengkap dari tabel Paimon saat job dimulai. Setelah itu, operator tersebut menghasilkan data inkremental dari tabel Paimon mulai dari titik tersebut.

Mengonsumsi data dari offset tertentu

Anda dapat mengonsumsi data dari tabel Paimon mulai dari offset tertentu dengan salah satu cara berikut:

  • Jika Anda hanya ingin mengonsumsi data inkremental berikutnya dan tidak mengonsumsi data lengkap di tabel Paimon saat job dimulai, atur 'scan.mode' = 'latest' menggunakan Petunjuk SQL.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • Jika Anda tidak ingin mengonsumsi data lengkap dan hanya ingin mengonsumsi data inkremental mulai dari titik waktu tertentu, atur parameter scan.timestamp-millis menggunakan Petunjuk SQL. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak Unix epoch (1970-01-01 00:00:00 UTC) hingga titik waktu yang diinginkan.

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • Jika Anda ingin mengonsumsi data lengkap yang ditulis setelah titik waktu tertentu dan terus mengonsumsi data inkremental berikutnya, Anda dapat melakukan salah satu dari dua operasi berikut.

    Catatan

    Metode konsumsi ini membaca file data yang dimodifikasi setelah titik waktu tertentu. Karena kompaksi file kecil, file data tersebut mungkin berisi sejumlah kecil data yang ditulis sebelum titik waktu tersebut. Anda dapat menambahkan kondisi filter WHERE pada job SQL untuk memfilter data sesuai kebutuhan.

    • Jangan mengatur Petunjuk SQL apa pun. Saat memulai job, pilih Specify Source Start Time dan tentukan informasi waktunya.image.png

    • Atur parameter scan.file-creation-time-millis menggunakan Petunjuk SQL.

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • Jika Anda tidak ingin mengonsumsi data lengkap dan hanya ingin mengonsumsi data inkremental mulai dari snapshot tertentu, atur parameter scan.snapshot-id menggunakan Petunjuk SQL. Parameter ini menentukan ID snapshot tersebut.

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • Jika Anda ingin mengonsumsi data lengkap dari snapshot tertentu dan terus mengonsumsi data inkremental berikutnya, atur parameter 'scan.mode' = 'from-snapshot-full' dan scan.snapshot-id menggunakan Petunjuk SQL. Parameter scan.snapshot-id menentukan ID snapshot tersebut.

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

Tentukan ID konsumen

ID konsumen menyimpan progres konsumsi untuk tabel Paimon dan digunakan dalam skenario utama berikut:

  • Jika Anda memodifikasi logika komputasi job SQL, topologi job mungkin berubah sehingga progres konsumsi tidak dapat dipulihkan dari state Flink. Jika Anda menetapkan ID konsumen, progres konsumsi untuk ID tersebut disimpan dalam file metadata tabel Paimon. Hal ini memungkinkan konsumsi dilanjutkan dari titik interupsi, bahkan jika job nantinya dijalankan dalam mode tanpa status.

  • Setelah ID konsumen ditetapkan, snapshot yang belum dikonsumsi tidak akan dihapus saat masa berlakunya habis. Hal ini mencegah error yang terjadi ketika kecepatan konsumsi tidak mampu mengimbangi kecepatan kedaluwarsa snapshot.

Anda dapat mengatur parameter consumer-id untuk memberikan ID konsumen kepada operator sumber Paimon dalam streaming job. Nilainya dapat berupa string apa pun. Saat ID konsumen dibuat pertama kali, offset konsumen awalnya ditentukan berdasarkan aturan di Mengonsumsi data dari offset tertentu. Setelah itu, jika Anda terus menggunakan ID konsumen yang sama, Anda dapat melanjutkan konsumsi dari tabel Paimon.

Sebagai contoh, pernyataan SQL berikut menunjukkan cara menetapkan ID konsumen bernama test-id untuk operator sumber Paimon. Jika Anda ingin mengatur ulang consumer offset yang sesuai dengan ID konsumen, Anda juga dapat mengatur 'consumer.ignore-progress' = 'true'.

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
Catatan

Karena snapshot yang belum dikonsumsi untuk suatu ID konsumen tidak dihapus saat masa berlakunya habis, snapshot tersebut beserta file data historisnya tidak akan pernah dihapus dan akan menghabiskan storage space jika Anda tidak membersihkan ID konsumen yang sudah usang secara tepat waktu. Anda dapat mengatur parameter tabel consumer.expiration-time untuk membersihkan ID konsumen yang tidak digunakan selama periode tertentu. Sebagai contoh, 'consumer.expiration-time' = '3d' membersihkan ID konsumen yang tidak digunakan selama tiga hari.

Streaming dan mengonsumsi hasil pernyataan INSERT OVERWRITE

Secara default, operasi INSERT OVERWRITE tidak menghasilkan data changelog. Data yang dihapus dan diimpor tidak dapat dikonsumsi oleh streaming job downstream. Untuk mengonsumsi data jenis ini, Anda dapat mengonfigurasi 'streaming-read-overwrite' = 'true' dalam streaming job menggunakan Petunjuk SQL.

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

Pekerjaan batch

Secara default, operator sumber Paimon dalam batch job membaca snapshot terbaru untuk menghasilkan data status terkini dari tabel Paimon.

Batch perjalanan waktu

Anda dapat mengatur parameter scan.timestamp-millis menggunakan Petunjuk SQL untuk mengkueri status tabel Paimon pada titik waktu tersebut. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak Unix epoch (1970-01-01 00:00:00 UTC) hingga titik waktu yang ditentukan.

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

Anda dapat mengatur parameter scan.snapshot-id menggunakan Petunjuk SQL untuk mengkueri status tabel Paimon saat snapshot tersebut dihasilkan. Parameter ini menentukan ID snapshot tersebut.

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

Mengkueri perubahan data antara dua snapshot

Jika Anda ingin mengkueri perubahan data di tabel Paimon antara dua snapshot, Anda dapat mengatur parameter incremental-between menggunakan Petunjuk SQL. Sebagai contoh, untuk melihat semua data yang berubah antara snapshot 20 dan snapshot 12, gunakan pernyataan SQL berikut.

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
Catatan

Karena batch job tidak mendukung mengonsumsi pesan DELETE, pesan tersebut dibuang secara default. Jika Anda ingin mengonsumsi pesan DELETE dalam batch job, Anda dapat mengkueri tabel sistem Audit Log. Sebagai contoh, SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;.

Menyesuaikan konkurensi tabel sumber

Secara default, Paimon secara otomatis melakukan inferensi konkurensi operator sumber berdasarkan informasi seperti jumlah partisi dan bucket. Anda dapat menggunakan Petunjuk SQL untuk mengatur parameter berikut guna menyesuaikan konkurensi operator sumber secara manual.

Parameter

Tipe data

Nilai default

Catatan

scan.parallelism

Integer

None

Menetapkan konkurensi operator sumber Paimon secara manual.

scan.infer-parallelism

Boolean

true

Menentukan apakah konkurensi operator sumber Paimon diinferensi secara otomatis.

scan.infer-parallelism.max

Integer

1024

Batas atas untuk konkurensi operator sumber Paimon yang diinferensi secara otomatis.

Berikut adalah contoh pernyataan SQL yang menetapkan konkurensi operator sumber Paimon secara manual menjadi 10.

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

Menggunakan tabel Paimon sebagai tabel dimensi

Tabel Paimon juga dapat digunakan sebagai tabel dimensi. Untuk informasi selengkapnya tentang sintaks JOIN tabel dimensi, lihat Pernyataan JOIN tabel dimensi.

Menulis dan mengonsumsi tipe data semi-terstruktur VARIANT

Pada Ververica Runtime (VVR) 11.1 dan versi lebih baru, tabel Paimon memperkenalkan tipe data semi-terstruktur VARIANT. Tipe ini memungkinkan Anda mengonversi string JSON bertipe VARCHAR ke tipe data VARIANT menggunakan PARSE_JSON atau TRY_PARSE_JSON. Menulis dan mengonsumsi data bertipe VARIANT secara langsung secara signifikan meningkatkan performa kueri dan pemrosesan data JSON.

Berikut adalah contoh pernyataan SQL:

CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
  k BIGINT,
  info VARIANT
);

INSERT INTO `my-catalog`.`my_db`.`my_tbl` 
SELECT k, PARSE_JSON(jsonStr) FROM T;

Dokumen terkait

  • Saat menulis data ke atau mengonsumsi data dari tabel Paimon, Anda dapat menggunakan Petunjuk SQL untuk memodifikasi parameter tabel secara sementara. Untuk informasi selengkapnya, lihat Mengelola tabel Paimon.

  • Untuk informasi selengkapnya tentang atribut dasar dan fitur tabel Paimon primary key serta tabel append-only, lihat Tabel Paimon primary key dan append-only.

  • Untuk informasi selengkapnya tentang optimasi umum untuk tabel Paimon primary key dan append-only dalam berbagai skenario, lihat Optimasi performa Paimon.

  • Konsumsi tabel Paimon bergantung pada snapshot. Jika waktu kedaluwarsa snapshot terlalu singkat atau job konsumsi tidak efisien, snapshot yang sedang dikonsumsi mungkin dihapus saat masa berlakunya habis. Hal ini menyebabkan job konsumsi melaporkan error File xxx not found, Possible causes. Untuk informasi tentang cara mengatasi masalah ini, lihat Error "File xxx not found, Possible causes" dilaporkan untuk job yang membaca data dari tabel Paimon.