Gunakan konektor Paimon bersama Katalog Paimon untuk hasil terbaik. Topik ini menjelaskan cara menggunakan konektor Paimon dalam skenario data lakehouse streaming.
Informasi latar belakang
Apache Paimon adalah format penyimpanan lake terpadu untuk aliran dan batch yang mendukung penulisan throughput tinggi serta kueri latensi rendah. Mesin komputasi utama di platform big data open source Alibaba Cloud E-MapReduce—termasuk Flink, Spark, Hive, dan Trino—terintegrasi secara optimal dengan Paimon. Anda dapat membangun layanan penyimpanan data lake sendiri secara cepat di HDFS atau Object Storage Service berbasis cloud, lalu menghubungkannya dengan mesin komputasi yang didukung untuk menganalisis data lake tersebut. Untuk informasi lebih lanjut, lihat Apache Paimon.
Kategori | Detail |
Jenis yang didukung | Tabel sumber, tabel dimensi, dan tabel sink (target ingesti data) |
Mode eksekusi | Mode streaming dan batch |
Format data | Tidak didukung |
Metrik pemantauan unik | Tidak ada |
Jenis API | Pekerjaan ingesti data berbasis SQL dan YAML |
Mendukung pembaruan atau penghapusan data di tabel sink | Ya |
Fitur utama
Apache Paimon menyediakan kemampuan inti berikut:
Membangun layanan penyimpanan data lake ringan dan berbiaya rendah di HDFS atau Object Storage Service.
Membaca dan menulis dataset skala besar dalam mode streaming maupun batch.
Menjalankan kueri batch dan OLAP dengan freshness data dalam hitungan menit hingga detik.
Mengingesti dan menghasilkan data inkremental, berfungsi sebagai penyimpanan di semua lapisan gudang data offline tradisional maupun gudang data streaming modern.
Pre-agregasi data untuk mengurangi biaya penyimpanan dan beban komputasi downstream.
Rollback ke versi historis.
Filter data secara efisien.
Dukungan evolusi skema.
Batasan dan rekomendasi
Konektor Paimon hanya didukung pada Ververica Runtime (VVR) versi 6.0.6 dan yang lebih baru.
Tabel berikut menunjukkan pemetaan antara versi komunitas Paimon dan versi mesin Realtime Compute for Apache Flink (VVR).
Versi Apache Paimon
Versi VVR
1.3
11.4
1.2
11.2 dan 11.3
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, dan 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
Rekomendasi penyimpanan untuk penulisan konkuren
Saat beberapa pekerjaan memperbarui tabel Paimon yang sama secara konkuren, penyimpanan OSS standar (oss://) dapat menyebabkan konflik commit langka atau kegagalan pekerjaan karena batasan atomicity pada operasi file.
Untuk memastikan penulisan stabil dan berkelanjutan, gunakan metadata atau layanan penyimpanan yang menjamin atomicity kuat. Kami merekomendasikan penggunaan Data Lake Formation (DLF). DLF menyatukan manajemen metadata dan penyimpanan Paimon. Alternatifnya, gunakan OSS-HDFS atau HDFS.
Aktivasi Pengaturan Parameter
Setelah mengubah parameter konfigurasi tabel Paimon, restart pekerjaan terkait agar pengaturan baru berlaku. Pekerjaan yang sedang berjalan tidak dapat mendeteksi atau memuat perubahan tersebut secara dinamis.
Penundaan pembersihan fisik setelah penghapusan partisi
Saat menjalankan DROP PARTITION, sistem tidak menghapus file data fisik yang mendasari secara langsung.
Operasi ini hanya melakukan penghapusan logis. Paimon menghapus metadata partisi dari snapshot terbaru. Karena Paimon mendukung time travel, snapshot historis tetap menyimpan referensi ke file data partisi tersebut. File fisik hanya dihapus setelah semua snapshot yang mereferensikan partisi tersebut kedaluwarsa dan dibersihkan oleh mekanisme kedaluwarsa snapshot Paimon.
SQL
Anda dapat menggunakan konektor Paimon dalam pekerjaan SQL sebagai tabel sumber atau tabel sink.
Sintaks
Jika Anda membuat tabel Paimon di Katalog Paimon, abaikan parameter
connector. Sintaksnya sebagai berikut.CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );CatatanJika Anda telah membuat tabel Paimon di Katalog Paimon, gunakan langsung tanpa membuat ulang.
Jika Anda membuat tabel temporary Paimon di katalog lain, tentukan parameter connector dan path ke file tabel Paimon. Sintaksnya sebagai berikut.
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- Membuat file secara otomatis jika path tidak berisi data tabel Paimon. ... );CatatanContoh path:
'path' = 'oss://<bucket>/test/order.db/orders'. Jangan menghilangkan akhiran.db. Paimon menggunakan akhiran ini untuk mengidentifikasi database.Beberapa pekerjaan yang menulis ke satu tabel harus berbagi path yang sama.
Path yang berbeda berarti tabel yang berbeda—meskipun mengarah ke lokasi fisik yang sama. Misalnya,
oss://b/testdanoss://b/test/hanya berbeda tanda slash di akhir tetapi merujuk ke tabel yang berbeda. Ketidaksesuaian konfigurasi katalog menyebabkan konflik penulisan konkuren, kegagalan kompaksi, atau kehilangan data.
DENGAN parameter
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Jenis tabel. | String | Tidak | Tidak ada |
|
path | Path penyimpanan tabel. | String | Tidak | Tidak ada |
|
auto-create | Saat membuat tabel temporary Paimon, buat file secara otomatis jika path yang ditentukan tidak berisi file tabel Paimon. | Boolean | Tidak | false | Nilai valid:
|
file.format | Kelas penyimpanan untuk file data di tabel. | String | Tidak | parquet | Nilai valid:
|
bucket | Jumlah bucket per partisi. | Integer | Tidak | 1 | Data yang ditulis ke tabel Paimon didistribusikan ke bucket berdasarkan Catatan Kami merekomendasikan ukuran bucket tetap di bawah 5 GB. |
bucket-key | Kolom kunci bucket. | String | Tidak | Tidak ada | Tentukan kolom yang nilainya menentukan bagaimana data didistribusikan ke bucket. Pisahkan nama kolom dengan koma Inggris (,), misalnya, Catatan
|
changelog-producer | Mekanisme untuk menghasilkan data inkremental. | String | Tidak | none | Paimon dapat menghasilkan changelog lengkap (dengan catatan update_before dan update_after yang sesuai) untuk aliran data input apa pun. Ini membantu konsumen downstream memproses pembaruan dengan benar. Nilai valid:
Untuk panduan memilih produsen changelog, lihat Changelog producer. |
full-compaction.delta-commits | Interval maksimum antara full compaction. | Integer | Tidak | Tidak ada | Parameter ini menentukan berapa banyak commit snapshot yang terjadi sebelum full compaction dijalankan. |
lookup.cache-max-memory-size | Ukuran cache memori untuk tabel dimensi Paimon. | String | Tidak | 256 MB | Pengaturan ini mengontrol ukuran cache tabel dimensi dan ukuran cache produsen changelog lookup. |
merge-engine | Mekanisme untuk menggabungkan baris dengan kunci primer yang sama. | String | Tidak | deduplicate | Nilai valid:
Untuk detail tentang mesin merge, lihat Merge engine. |
partial-update.ignore-delete | Apakah mengabaikan pesan hapus (-D). | Boolean | Tidak | false | Nilai valid:
Catatan
|
ignore-delete | Apakah mengabaikan pesan hapus (-D). | Boolean | Tidak | false | Nilai valid yang sama seperti partial-update.ignore-delete. Catatan
|
partition.default-name | Nama partisi default. | String | Tidak | __DEFAULT_PARTITION__ | Digunakan sebagai nama partisi saat nilai kolom partisi null atau string kosong. |
partition.expiration-check-interval | Seberapa sering memeriksa partisi yang kedaluwarsa. | String | Tidak | 1h | Untuk detail, lihat Cara mengonfigurasi kedaluwarsa partisi otomatis? |
partition.expiration-time | Periode retensi partisi. | String | Tidak | Tidak ada | Partisi kedaluwarsa setelah durasi ini. Secara default, partisi tidak pernah kedaluwarsa. Durasi dihitung dari nilai partisi. Untuk detail, lihat Cara mengonfigurasi kedaluwarsa partisi otomatis? |
partition.timestamp-formatter | Mengonversi string waktu ke timestamp terformat. | String | Tidak | Tidak ada | Menentukan format yang digunakan untuk mengekstrak usia partisi dari nilai partisi. Untuk detail, lihat Cara mengonfigurasi kedaluwarsa partisi otomatis? |
partition.timestamp-pattern | String format untuk mengonversi nilai partisi ke string timestamp. | String | Tidak | Tidak ada | Menentukan format yang digunakan untuk mengekstrak usia partisi dari nilai partisi. Untuk detail, lihat Cara mengonfigurasi kedaluwarsa partisi otomatis? |
scan.bounded.watermark | Berhenti membaca dari tabel sumber Paimon saat watermark-nya melebihi nilai ini. | Long | Tidak | Tidak ada | Tidak ada. |
scan.mode | Offset konsumen untuk tabel sumber Paimon. | String | Tidak | default | Untuk detail, lihat Cara mengonfigurasi offset konsumen untuk tabel sumber Paimon? |
scan.snapshot-id | ID snapshot untuk mulai membaca. | Integer | Tidak | Tidak ada | Untuk detail, lihat Cara mengonfigurasi offset konsumen untuk tabel sumber Paimon? |
scan.timestamp-millis | Timestamp untuk mulai membaca. | Integer | Tidak | Tidak ada | Untuk detail, lihat Cara mengonfigurasi offset konsumen untuk tabel sumber Paimon? |
snapshot.num-retained.max | Jumlah maksimum snapshot terbaru yang dipertahankan. | Integer | Tidak | 2147483647 | Snapshot kedaluwarsa jika memenuhi batas ini atau snapshot.time-retained, dan juga memenuhi snapshot.num-retained.min. |
snapshot.num-retained.min | Jumlah minimum snapshot terbaru yang dipertahankan. | Integer | Tidak | 10 | Tidak ada. |
snapshot.time-retained | Berapa lama snapshot tetap ada sebelum kedaluwarsa. | String | Tidak | 1h | Snapshot kedaluwarsa jika memenuhi batas ini atau snapshot.num-retained.max, dan juga memenuhi snapshot.num-retained.min. |
write-mode | Mode penulisan untuk tabel Paimon. | String | Tidak | change-log | Nilai valid:
Untuk detail, lihat Write mode. |
scan.infer-parallelism | Apakah inferensi paralelisme untuk tabel sumber Paimon dilakukan secara otomatis. | Boolean | Tidak | true | Nilai valid:
|
scan.parallelism | Paralelisme untuk tabel sumber Paimon. | Integer | Tidak | Tidak ada | Catatan Parameter ini tidak berpengaruh saat resource allocation mode diatur ke Mode Ahli pada tab . |
sink.parallelism | Paralelisme untuk tabel sink Paimon. | Integer | Tidak | Tidak ada | Catatan Parameter ini tidak berpengaruh saat resource allocation mode diatur ke Mode Ahli pada tab . |
sink.clustering.by-columns | Kolom pengelompokan untuk tabel sink Paimon. | String | Tidak | Tidak ada | Untuk tabel append-only Paimon (tabel non-kunci-primer), konfigurasikan parameter ini dalam pekerjaan batch untuk mengaktifkan penulisan pengelompokan. Pengelompokan meningkatkan kecepatan kueri dengan mengelompokkan data berdasarkan rentang nilai pada kolom yang ditentukan. Pisahkan nama kolom dengan koma (,). Contoh: Untuk detail, lihat dokumentasi Apache Paimon. |
sink.delete-strategy | Strategi validasi untuk memastikan penanganan pesan retract (-D/-U) yang benar. | Enum | Tidak | NONE | Strategi valid dan perilaku sink yang diperlukan:
Catatan
|
Untuk opsi konfigurasi lainnya, lihat dokumentasi Apache Paimon.
Detail fitur
Jaminan freshness dan konsistensi data
Tabel sink Paimon menggunakan protokol two-phase commit untuk melakukan commit data selama setiap checkpoint pekerjaan Flink. Oleh karena itu, freshness data sesuai dengan interval checkpoint pekerjaan Flink. Setiap commit menghasilkan hingga dua snapshot.
Saat dua pekerjaan Flink menulis secara simultan ke tabel Paimon yang sama, konsistensi serializable dijamin jika data mereka ditulis ke bucket yang berbeda. Namun, jika data mereka ditulis ke bucket yang sama, hanya konsistensi isolasi snapshot yang dapat dijamin. Artinya, data tabel mungkin merupakan campuran hasil dari kedua pekerjaan, tetapi tidak akan terjadi kehilangan data.
Mesin penggabungan
Saat tabel sink Paimon menerima beberapa baris dengan kunci primer yang sama, tabel tersebut menggabungkannya menjadi satu baris untuk menjaga keunikan. Atur parameter merge-engine untuk mengontrol cara penggabungan bekerja. Tabel berikut menjelaskan setiap opsi.
Merge engine | Detail |
Deduplicate | Deduplicate adalah mesin merge default. Untuk baris dengan kunci primer yang sama, tabel sink Paimon hanya menyimpan baris terbaru dan membuang sisanya. Catatan Jika baris terbaru adalah pesan hapus, semua baris dengan kunci primer tersebut dibuang. |
Partial update | Dengan partial update, Anda dapat memperbarui data secara inkremental melalui beberapa pesan. Baris baru dengan kunci primer yang sama menimpa yang ada, tetapi kolom null tetap tidak berubah. Misalnya, anggap tabel sink Paimon menerima baris-baris berikut secara berurutan:
Jika kolom pertama adalah kunci primer, hasil akhirnya adalah <1, 25.2, 10, 'This is a book'>. Catatan
|
Aggregation | Dalam beberapa kasus, Anda hanya membutuhkan nilai agregat. Dengan aggregation, Paimon menggabungkan baris dengan kunci primer yang sama menggunakan fungsi agregat yang Anda tentukan. Untuk setiap kolom non-kunci-primer, definisikan fungsi agregat menggunakan Kolom price melakukan agregasi menggunakan max; sales menggunakan sum. Diberikan input <1, 23.0, 15> dan <1, 30.2, 20>, hasilnya adalah <1, 30.2, 35>. Fungsi agregat dan tipe data yang didukung:
Catatan
|
Mekanisme generasi data inkremental
Atur parameter changelog-producer untuk menghasilkan changelog lengkap (dengan catatan update_before dan update_after yang sesuai) untuk aliran data input apa pun. Tabel berikut mencantumkan semua produsen changelog yang tersedia. Untuk detail lebih lanjut, lihat dokumentasi Apache Paimon.
Mekanisme | Detail |
None | Saat Misalnya, anggap konsumen downstream menghitung jumlah suatu kolom. Jika hanya melihat nilai terbaru 5, konsumen tidak dapat memutuskan cara memperbarui jumlah tersebut. Jika nilai sebelumnya 4, konsumen harus menambahkan 1. Jika 6, konsumen harus mengurangi 1. Konsumen yang sensitif terhadap data update_before harus menghindari none. Namun, produsen lain menimbulkan overhead performa. Catatan Jika konsumen downstream Anda—seperti database—tidak memerlukan data update_before, none dapat diterima. Pilih berdasarkan kebutuhan Anda. |
Input | Atur Oleh karena itu, gunakan ini hanya saat aliran input itu sendiri merupakan changelog lengkap—seperti data CDC. |
Lookup | Saat Anda mengatur Dibandingkan Full Compaction, Lookup memberikan changelog yang lebih segar tetapi mengonsumsi lebih banyak sumber daya. Kami merekomendasikan fitur ini untuk data inkremental yang memerlukan freshness tinggi, seperti pembaruan tingkat menit. |
Full Compaction | Setelah Anda mengatur Dibandingkan Lookup, Full Compaction memberikan changelog yang kurang segar tetapi menggunakan lebih sedikit sumber daya karena memanfaatkan pekerjaan kompaksi yang sudah ada. Pendekatan ini direkomendasikan saat persyaratan freshness untuk data inkremental tidak ketat, seperti saat pembaruan per jam dapat diterima. |
Mode penulisan
Tabel Paimon mendukung mode penulisan berikut.
Mode | Detail |
Change-log | Change-log adalah mode penulisan default. Mode ini mendukung operasi insert, delete, dan update berdasarkan kunci primer. Anda juga dapat menggunakan mesin merge dan produsen changelog dengan mode ini. |
Append-only | Mode append-only hanya mendukung insert dan tidak menggunakan kunci primer. Mode ini lebih efisien daripada mode change-log. Gunakan sebagai alternatif antrian pesan saat freshness data moderat dapat diterima—seperti tingkat menit. Untuk detail, lihat dokumentasi Apache Paimon. Saat menggunakan mode append-only, perhatikan hal berikut:
|
Target untuk CTAS dan CDAS
Tabel Paimon mendukung sinkronisasi real-time tabel tunggal atau seluruh database. Perubahan skema di tabel upstream disinkronkan ke tabel Paimon secara real-time. Untuk detail, lihat Kelola tabel Paimon dan Kelola Katalog Paimon.
Ingesti Data
Anda dapat menggunakan konektor Paimon dalam pekerjaan ingesti data berbasis YAML sebagai sink.
Sintaks
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseOpsi konfigurasi
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Catatan |
type | Jenis konektor. | Ya | STRING | Tidak ada | Nilai tetap: |
name | Nama sink. | Tidak | STRING | Tidak ada | Nama sink. |
catalog.properties.metastore | Jenis Katalog Paimon. | Tidak | STRING | filesystem | Nilai valid:
|
catalog.properties.* | Parameter untuk membuat Katalog Paimon. | Tidak | STRING | Tidak ada | Untuk detail, lihat Kelola Katalog Paimon. |
table.properties.* | Parameter untuk membuat tabel Paimon. | Tidak | STRING | Tidak ada | Untuk detail, lihat Opsi tabel Paimon. |
catalog.properties.warehouse | Direktori root untuk penyimpanan file. | Tidak | STRING | Tidak ada | Parameter ini hanya berlaku saat |
commit.user-prefix | Awalan username yang digunakan saat melakukan commit file data. | Tidak | STRING | Tidak ada | Catatan Gunakan username berbeda untuk pekerjaan berbeda untuk membantu mengidentifikasi pekerjaan yang bertentangan selama kegagalan commit. |
partition.key | Bidang partisi untuk setiap tabel partisi. | Tidak | STRING | Tidak ada | Tabel berbeda dipisahkan dengan |
sink.cross-partition-upsert.tables | Tabel yang memerlukan upsert lintas partisi (kunci primer tidak mencakup semua bidang partisi). | Tidak | STRING | Tidak ada | Berlaku untuk tabel yang memerlukan upsert lintas partisi.
Penting
|
sink.commit.parallelism | Paralelisme untuk operator commit. | Tidak | INTEGER | Tidak ada | Tingkatkan nilai ini untuk meningkatkan performa saat operator commit menjadi bottleneck. Hanya didukung di VVR 11.6 dan yang lebih baru. Catatan Mengatur parameter ini mengubah paralelisme operator. Saat merestart pekerjaan stateful, aktifkan AllowNonRestoredState. |
Contoh penggunaan
Konfigurasikan konektor Paimon sebagai sink ingesti data berdasarkan jenis Katalog Paimon Anda.
Contoh konfigurasi untuk Katalog Paimon sebagai filesystem, menulis ke OSS Alibaba Cloud:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxUntuk makna parameter yang diawali catalog.properties, lihat Buat Katalog Filesystem Paimon.
Contoh: Katalog REST menulis ke Data Lake Formation Alibaba Cloud.
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca. table.properties.deletion-vectors.enabled: trueUntuk makna parameter yang diawali catalog.properties, lihat Parameter konfigurasi katalog Flink CDC.
Evolusi skema
Paimon mendukung peristiwa evolusi skema berikut saat digunakan sebagai sink ingesti data:
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (tidak mendukung perubahan tipe kolom kunci primer)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
Jika tabel Paimon downstream sudah ada, Paimon menggunakan skema yang ada untuk penulisan dan tidak mencoba membuat ulang tabel.
FAQ
Bagaimana cara mengonfigurasi offset konsumen untuk tabel sumber Paimon?
Mengapa pekerjaan Paimon saya gagal dengan “Heartbeat of TaskManager timed out”?
Mengapa pekerjaan Paimon saya gagal dengan “Sink materializer must not be used with Paimon sink”?
Mengapa pekerjaan Paimon saya gagal dengan “File xxx not found, Possible causes”?
Bagaimana cara menangani jumlah file Paimon yang besar di OSS?
Apakah visibilitas data konektor Paimon terkait dengan interval checkpoint?