Topik ini menjelaskan cara melakukan operasi insert, update, overwrite, dan delete pada tabel Apache Paimon (Paimon) menggunakan konsol pengembangan Realtime Compute for Apache Flink. Topik ini juga mencakup cara menggunakan data dari tabel Paimon berdasarkan offset tertentu.
Prasyarat
Katalog Paimon dan tabel Paimon telah dibuat. Untuk informasi lebih lanjut, lihat Mengelola katalog Apache Paimon.
Batasan
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.5 atau versi lebih baru yang mendukung tabel Paimon.
Menulis data ke tabel Paimon
Gunakan pernyataan CTAS atau CDAS untuk menyinkronkan perubahan data dan skema
Untuk informasi tentang cara menggunakan pernyataan CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS), lihat Mengelola katalog Apache Paimon.
Gunakan pernyataan INSERT INTO untuk menyisipkan atau memperbarui data
Tabel kunci utama mendukung semua jenis perubahan (INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE) sebagai input. Jika catatan data memiliki kunci utama yang sama, mereka akan digabungkan berdasarkan konfigurasi mesin penggabungan.
Tabel hanya-append (tanpa kunci utama) hanya mendukung perubahan tipe INSERT sebagai input.
Gunakan pernyataan INSERT OVERWRITE untuk menimpa data
Penimpaan adalah proses mengganti data lama dengan data baru. Data lama dihapus dan tidak dapat diakses lagi. Anda dapat menggunakan pernyataan INSERT OVERWRITE untuk menimpa sebagian atau seluruh tabel Paimon. Dalam contoh berikut, tabel bernama my_table digunakan.
Pernyataan INSERT OVERWRITE hanya berlaku untuk penyebaran batch.
Secara default, perubahan yang dihasilkan dari pernyataan INSERT OVERWRITE tidak dapat dikonsumsi oleh operator hilir dalam mode streaming. Untuk informasi tentang cara mengonsumsi perubahan dalam mode streaming, lihat Mengonsumsi hasil pernyataan INSERT OVERWRITE.
Timpa semua data dalam tabel yang tidak dipartisi.
INSERT OVERWRITE my_table SELECT ...;Timpa partisi tabel, seperti partisi
dt=20240108,hh=06.INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;Timpa beberapa partisi tabel. Tentukan partisi yang ingin Anda timpa dalam pernyataan SELECT.
INSERT OVERWRITE my_table SELECT ...;Timpa semua data dalam tabel yang dipartisi.
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
Gunakan pernyataan DELETE untuk menghapus data
Anda dapat menggunakan pernyataan DELETE untuk menghapus data dari tabel kunci utama. Anda harus mengeksekusi pernyataan DELETE dalam skrip.
-- Hapus semua catatan data yang bidang mata uangnya adalah 'UNKNOWN' dari tabel my_table.
DELETE FROM my_table WHERE currency = 'UNKNOWN';Abaikan perubahan DELETE
Jika tabel kunci utama menerima perubahan DELETE, data yang sesuai dihapus secara default. Untuk mengabaikan perubahan DELETE dan mencegah penghapusan data, gunakan petunjuk SQL untuk mengatur parameter ignore-delete menjadi true.
Parameter | Deskripsi | Tipe Data | Nilai default |
ignore-delete | Menentukan apakah akan mengabaikan perubahan DELETE dari upstream. | Boolean | false |
Atur paralelisme sink
Anda dapat menggunakan petunjuk SQL untuk secara manual mengubah paralelisme operator sink. Tabel berikut menjelaskan parameter tersebut.
Parameter | Deskripsi | Tipe Data | Nilai default |
sink.parallelism | Paralelisme operator sink. | Integer | N/A |
Dalam contoh berikut, paralelisme operator sink diatur menjadi 10.
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;Menggunakan data dari tabel Paimon
Gunakan penyebaran streaming
Sebelum menggunakan penyebaran streaming untuk menggunakan data dari tabel kunci utama, lengkapi konfigurasi produsen changelog.
Secara default, operator sumber menghasilkan semua data dalam tabel Paimon pada waktu mulai penyebaran dan terus menghasilkan data tambahan dalam tabel Paimon.
Konfigurasikan offset konsumen
Untuk menggunakan data dari tabel Paimon berdasarkan offset tertentu, gunakan metode berikut:
Jika Anda ingin menggunakan hanya data tambahan setelah waktu mulai penyebaran, gunakan petunjuk SQL untuk menentukan
'scan.mode' = 'latest'.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;Jika Anda ingin menggunakan hanya data tambahan setelah titik waktu tertentu, gunakan petunjuk SQL untuk menentukan parameter
scan.timestamp-millis. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak waktu epoch UTC 1970-01-01 00:00:00.SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;Jika Anda ingin menggunakan semua data yang ditulis dari titik waktu tertentu dan data tambahan berikutnya, gunakan metode berikut:
CatatanFile data yang dibaca oleh operator sumber mungkin berisi sejumlah kecil data yang ditulis sebelum titik waktu tertentu karena Paimon menggabungkan file kecil untuk konsumsi hilir. Anda dapat menggunakan kondisi WHERE dalam pernyataan SQL untuk memfilter data berdasarkan kebutuhan bisnis Anda.
Konfigurasikan parameter Tentukan waktu mulai sumber saat Anda memulai penyebaran di GUI.

Gunakan petunjuk SQL untuk menentukan parameter
scan.file-creation-time-millis.SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
Jika Anda ingin menggunakan hanya data tambahan mulai dari file snapshot tertentu, gunakan petunjuk SQL untuk menentukan parameter
scan.snapshot-id. Parameter ini menentukan ID file snapshot yang ingin Anda gunakan.SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;Jika Anda ingin menggunakan semua data yang terkandung dalam file snapshot tertentu dan data tambahan berikutnya, gunakan petunjuk SQL untuk menyertakan konfigurasi
'scan.mode' = 'from-snapshot-full'dan tentukan parameterscan.snapshot-id. Parameterscan.snapshot-idmenentukan ID file snapshot yang ingin Anda gunakan.SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
Tentukan ID konsumen
ID konsumen digunakan untuk mencatat offset konsumen dalam tabel Paimon.
Jika Anda memodifikasi logika komputasi dalam pernyataan SQL, topologi penyebaran mungkin berubah dan offset konsumen mungkin gagal dipulihkan dari status yang disimpan oleh Realtime Compute for Apache Flink. Dalam hal ini, Anda dapat menentukan ID konsumen untuk menyimpan offset konsumen dalam file metadata tabel Paimon. Dengan cara ini, konsumen dapat melanjutkan dari offset sebelumnya meskipun penyebaran dimulai ulang secara stateless.
Jika file snapshot yang kedaluwarsa dihapus sebelum dikonsumsi, kesalahan terjadi. Anda dapat menentukan ID konsumen untuk mencegah penghapusan file snapshot yang kedaluwarsa yang belum dikonsumsi.
Untuk menentukan ID konsumen untuk operator sumber, atur parameter consumer-id ke string. Pertama kali Anda menentukan ID konsumen, offset konsumen ditentukan seperti yang dijelaskan dalam Konfigurasikan offset konsumen. Saat Anda menentukan ID konsumen yang sama dalam operasi berikutnya, konsumen dapat melanjutkan dari offset konsumen yang tersimpan dari tabel Paimon.
Sebagai contoh, Anda dapat menggunakan pernyataan SQL berikut untuk menentukan ID konsumen bernama test-id untuk operator sumber. Jika Anda ingin mengatur ulang offset konsumen dari ID konsumen, tentukan 'consumer.ignore-progress' = 'true'.
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;Saat menggunakan ID konsumen, file snapshot dan file data historis yang sesuai tidak dihapus setelah kedaluwarsa. Ini dapat mengakibatkan pemborosan ruang penyimpanan. Untuk menyelesaikan masalah di atas, Anda dapat menentukan parameter consumer.expiration-time untuk menghapus ID konsumen yang tidak aktif. Sebagai contoh, 'consumer.expiration-time' = '3d' menentukan bahwa ID konsumen yang tidak digunakan selama tiga hari berturut-turut dihapus.
Mengonsumsi hasil pernyataan INSERT OVERWRITE
Secara default, perubahan yang dihasilkan dari pernyataan INSERT OVERWRITE tidak dikonsumsi oleh operator hilir dalam mode streaming. Jika Anda ingin mengonsumsi perubahan tersebut, gunakan petunjuk SQL untuk menentukan 'streaming-read-overwrite' = 'true'.
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;Gunakan penyebaran batch
Secara default, operator sumber penyebaran batch membaca file snapshot terbaru dari tabel Paimon untuk menghasilkan data status terbaru.
Batch Time Travel
Untuk menanyakan status tabel Paimon pada titik waktu tertentu, gunakan petunjuk SQL untuk menentukan parameter scan.timestamp-millis. Parameter ini menentukan jumlah milidetik yang telah berlalu sejak waktu epoch UTC 1970-01-01 00:00:00.
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;Untuk menanyakan status tabel Paimon saat file snapshot tertentu dihasilkan, gunakan petunjuk SQL untuk menentukan parameter scan.snapshot-id. Parameter ini menentukan ID file snapshot yang ingin Anda gunakan.
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;Tanyakan perubahan tambahan antara dua snapshot
Untuk menanyakan perubahan tambahan ke tabel Paimon antara dua snapshot tertentu, gunakan petunjuk SQL untuk menentukan parameter incremental-between. Sebagai contoh, Anda dapat mengeksekusi pernyataan SQL berikut untuk menanyakan data yang berubah antara snapshot 20 dan snapshot 12.
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;Secara default, perubahan DELETE dibuang dalam penyebaran batch. Jika Anda ingin mengonsumsi perubahan DELETE dalam penyebaran batch, tanyakan tabel log audit yang disediakan oleh Paimon. Contoh: SELECT * FROM 't$audit_log ' /*+ OPTIONS('incremental-between' = '12,20') */;.
Atur paralelisme sumber
Secara default, Paimon secara otomatis menurunkan paralelisme operator sumber berdasarkan informasi seperti jumlah partisi dan bucket. Anda dapat menggunakan petunjuk SQL untuk secara manual mengubah paralelisme operator sumber.
Parameter | Tipe Data | Nilai default | Catatan |
scan.parallelism | Integer | N/A | Paralelisme operator sumber. |
scan.infer-parallelism | Boolean | true | Menentukan apakah akan secara otomatis menurunkan paralelisme operator sumber. |
scan.infer-parallelism.max | Integer | 1024 | Paralelisme maksimum yang diturunkan oleh operator sumber. |
Dalam contoh berikut, paralelisme operator sumber diatur menjadi 10.
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;Fungsi sebagai tabel dimensi
Anda dapat menggunakan tabel Paimon sebagai tabel dimensi. Untuk informasi tentang cara menulis klausa JOIN, lihat Sintaks.
Mengonsumsi dan menulis data VARIANT
Dalam VVR 11.1 atau versi lebih baru, tabel Paimon mendukung tipe semi-struktur VARIANT. Anda dapat menggunakan PARSE_JSON atau TRY_PARSE_JSON untuk mengonversi data VARCHAR berformat JSON menjadi VARIANT. Tipe VARIANT secara signifikan meningkatkan kinerja penanyakan dan pemrosesan data berformat JSON.
Contoh kode:
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
k BIGINT,
info VARIANT
);
INSERT INTO `my-catalog`.`my_db`.`my_tbl`
SELECT k, PARSE_JSON(jsonStr) FROM T;Referensi
Saat melakukan operasi tulis atau baca dalam tabel Paimon, Anda dapat menggunakan petunjuk SQL untuk sementara memodifikasi parameter tabel. Untuk informasi lebih lanjut, lihat Mengelola tabel Apache Paimon.
Untuk informasi tentang fitur tabel kunci utama dan tabel hanya-append, lihat Tabel kunci utama dan tabel hanya-append.
Untuk informasi tentang cara mengoptimalkan tabel kunci utama dan tabel yang dapat diskalakan, lihat Optimasi Kinerja.
Konsumen menggunakan file snapshot untuk mengakses tabel Paimon. Jika file snapshot disimpan hanya dalam waktu singkat atau efisiensi konsumsi rendah, file snapshot yang sedang digunakan mungkin terhapus. Akibatnya, muncul pesan kesalahan
File xxx tidak ditemukan, Kemungkinan penyebab. Untuk informasi tentang cara menyelesaikan masalah ini, lihat Apa yang harus saya lakukan jika pesan kesalahan "File xxx tidak ditemukan, Kemungkinan penyebab" muncul saat membaca data dari tabel sumber Apache Paimon dalam penyebaran?