Topik ini menjelaskan cara menggunakan konektor Hudi.
Karena perkembangan ekologi pasar dan penyesuaian strategi layanan, konektor Hudi bawaan resmi tidak lagi didukung di versi Ververica Runtime (VVR) mendatang dari Realtime Compute for Apache Flink. Anda dapat menggunakan konektor kustom untuk menghubungkan Realtime Compute for Apache Flink ke Apache Hudi. Selain itu, kami menyarankan Anda melakukan migrasi data dan menggunakan konektor Paimon untuk mendapatkan fitur dan performa yang dioptimalkan.
Informasi latar belakang
Apache Hudi adalah kerangka kerja sumber terbuka yang mengelola data tabel dalam danau data. Apache Hudi mengatur tata letak file berdasarkan Alibaba Cloud Object Storage Service (OSS) atau Hadoop Distributed File System (HDFS) untuk memastikan atomicity, consistency, isolation, durability (ACID) dan mendukung pembaruan serta penghapusan data tingkat baris secara efisien. Hal ini menyederhanakan pengembangan extract, transform, dan load (ETL). Apache Hudi juga mendukung pengelolaan otomatis dan penggabungan file kecil untuk menjaga ukuran file tetap sesuai. Ini membantu mencegah penurunan performa query akibat file kecil yang berlebihan selama proses penyisipan dan pembaruan data serta menghilangkan beban operasional pemantauan dan penggabungan file kecil secara manual.
Item | Deskripsi |
Jenis tabel | Tabel sumber dan tabel hasil |
Mode operasi | Mode streaming dan mode batch |
Format data | Tidak tersedia |
Metrik |
Catatan Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu. |
Jenis API | API DataStream dan API SQL |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Fitur
Item | Deskripsi |
Fitur inti |
|
Skenario tipikal |
|
Keuntungan | Dibandingkan dengan komunitas Hudi open source, Hudi yang terintegrasi ke dalam Flink yang sepenuhnya dikelola memberikan lebih banyak keuntungan. Hudi yang terintegrasi ke dalam Flink yang sepenuhnya dikelola memberikan keuntungan berikut:
|
Batasan
Hanya Realtime Compute for Apache Flink dengan versi engine vvr-4.0.11-flink-1.13 atau lebih baru yang mendukung konektor Hudi.
Hanya HDFS, Alibaba Cloud OSS, atau OSS-HDFS yang dapat digunakan sebagai sistem file.
Anda tidak dapat mempublikasikan draf dalam cluster sesi.
Anda tidak dapat menggunakan konektor Hudi untuk memodifikasi bidang dalam tabel. Jika Anda ingin memodifikasi bidang dalam tabel, gunakan pernyataan SQL Spark di konsol DLF.
Sintaksis
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
...
);Parameter dalam klausa WITH
Parameter dasar
Parameter Umum
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
STRING
Ya
Tidak ada nilai default
Setel nilai menjadi hudi.
path
Path penyimpanan tabel.
STRING
Ya
Tidak ada nilai default
Tabel dapat disimpan di Bucket OSS, HDFS, atau OSS-HDFS.
OSS: Path dalam format
oss://<bucket>/<user-defined-dir>.HDFS: Path dalam format
hdfs://<user-defined-dir>.OSS-HDFS: Path dalam format
oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>.CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.3 atau lebih baru yang memungkinkan Anda menyetel parameter ini ke path OSS-HDFS.
Parameter dalam path:
bucket: nama Bucket OSS yang Anda buat.
user-defined-dir: path tempat data disimpan.
oss-hdfs-endpoint: endpoint layanan OSS-HDFS.
Anda dapat melihat endpoint dari HDFS di bagian Port halaman Overview untuk Bucket OSS di konsol OSS.
hoodie.datasource.write.recordkey.field
Bidang kunci utama.
STRING
Tidak
uuid
Anda dapat menggunakan sintaksis PRIMARY KEY untuk menentukan bidang kunci utama.
Pisahkan beberapa bidang dengan koma (,).
precombine.field
Bidang versi.
STRING
Tidak
ts
Bidang ini digunakan untuk menentukan apakah pesan perlu diperbarui.
Jika Anda tidak mengonfigurasi parameter ini, sistem memperbarui data berdasarkan urutan pesan yang didefinisikan di mesin.
oss.endpoint
Endpoint Alibaba Cloud OSS atau OSS-HDFS.
STRING
Tidak
Tidak ada nilai default
Jika Anda menyimpan tabel di OSS atau OSS-HDFS, Anda harus menentukan parameter ini.
Jika Anda menyimpan tabel di OSS, setel parameter ini ke endpoint OSS. Untuk informasi lebih lanjut tentang endpoint OSS, lihat Wilayah dan endpoint.
Jika Anda menyimpan tabel di OSS-HDFS, setel parameter ini ke endpoint OSS-HDFS. Anda dapat melihat endpoint dari HDFS di bagian Port halaman Overview untuk Bucket OSS di konsol OSS.
accessKeyId
ID AccessKey akun Alibaba Cloud Anda.
STRING
Tidak
Tidak ada nilai default
Jika Anda menyimpan tabel di OSS atau OSS-HDFS, Anda harus menentukan parameter ini.
Untuk informasi lebih lanjut, lihat Operasi konsol.
PentingUntuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi ID AccessKey dan Rahasia AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.
accessKeySecret
Rahasia AccessKey akun Alibaba Cloud Anda.
STRING
Tidak
Tidak ada nilai default
Parameter Eksklusif untuk Tabel Sumber
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
read.streaming.enabled
Menentukan apakah pembacaan streaming diaktifkan.
BOOLEAN
Tidak
false
Nilai valid:
true: Pembacaan streaming diaktifkan.
false: Pembacaan streaming dinonaktifkan.
read.start-commit
Offset dari mana konsumsi data dimulai.
STRING
Tidak
Dibiarkan kosong
Nilai valid:
Waktu dalam format yyyyMMddHHmmss: Data dikonsumsi dari waktu yang ditentukan.
earliest: Data dikonsumsi dari offset paling awal.
Jika parameter ini dibiarkan kosong, data dikonsumsi dari waktu terbaru.
Parameter Eksklusif untuk Tabel Sink
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
write.operation
Mode operasi penulisan dilakukan.
STRING
Tidak
UPSERT
Nilai valid:
insert: Data ditulis ke tabel dalam mode append.
upsert: Data diperbarui.
bulk_insert: Data batch ditulis ke tabel dalam mode append.
hive_sync.enable
Menentukan apakah sinkronisasi metadata ke Hive diaktifkan.
BOOLEAN
Tidak
false
Nilai valid:
true: Sinkronisasi metadata ke Hive diaktifkan.
false: Sinkronisasi metadata ke Hive dinonaktifkan.
hive_sync.mode
Mode sinkronisasi data Hive.
STRING
Tidak
hms
Nilai valid:
hms: Jika Anda ingin menyinkronkan metadata ke metastore Hive atau DLF, setel parameter ini ke hms.
jdbc: Jika Anda ingin menyinkronkan metadata ke driver Java Database Connectivity (JDBC), setel parameter ini ke jdbc.
hive_sync.db
Nama database Hive tempat data disinkronkan.
STRING
Tidak
default
Tidak tersedia
hive_sync.table
Nama tabel Hive tempat data disinkronkan.
STRING
Tidak
Nama tabel saat ini
Nama tabel Hive tempat data disinkronkan dari Hudi tidak boleh mengandung tanda hubung (-).
dlf.catalog.region
Nama wilayah tempat layanan DLF diaktifkan.
STRING
Tidak
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Wilayah dan endpoint yang didukung.
CatatanParameter dlf.catalog.region berlaku hanya jika parameter hive_sync.mode diatur ke hms.
Pastikan nilai parameter ini sesuai dengan endpoint yang ditentukan oleh parameter dlf.catalog.endpoint.
dlf.catalog.endpoint
Endpoint DLF.
STRING
Tidak
Tidak ada nilai default
Untuk informasi lebih lanjut, lihat Wilayah dan endpoint yang didukung.
CatatanParameter dlf.catalog.endpoint berlaku hanya jika parameter hive_sync.mode diatur ke hms.
Kami sarankan Anda menyetel parameter dlf.catalog.endpoint ke endpoint virtual private cloud (VPC) dari DLF. Sebagai contoh, jika Anda memilih wilayah China (Hangzhou), setel parameter ini ke dlf-vpc.cn-hangzhou.aliyuncs.com.
Jika Anda ingin mengakses DLF lintas VPC, ikuti petunjuk yang dijelaskan dalam Bagaimana cara Realtime Compute for Apache Flink mengakses layanan lintas VPC?
Parameter lanjutan
Hudi mendukung berbagai skenario pembacaan dan penulisan. Tabel-tabel berikut menjelaskan parameter yang dapat dikonfigurasi dalam skenario berbeda.
Parameter untuk paralelisme
Parameter | Deskripsi | Nilai default | Catatan |
write.tasks | Paralelisme tugas penulisan. Setiap tugas penulisan menulis data ke 1 hingga N bucket secara berurutan. | 4 | Jika Anda meningkatkan paralelisme tugas penulisan, jumlah file kecil tetap tidak berubah. |
write.bucket_assign.tasks | Paralelisme operator pemberi tugas bucket. | Paralelisme penyebaran Realtime Compute for Apache Flink | Jika Anda meningkatkan paralelisme tugas penulisan dan paralelisme operator pemberi tugas bucket, jumlah file kecil bertambah. |
write.index_bootstrap.tasks | Paralelisme operator bootstrap indeks. | Paralelisme penyebaran Realtime Compute for Apache Flink |
|
read.tasks | Paralelisme operator baca streaming dan batch. | 4 | Tidak tersedia |
compaction.tasks | Paralelisme operator kompaksi online. | 4 | Kompaksi online mengonsumsi lebih banyak sumber daya daripada kompaksi offline. Kami sarankan Anda melakukan kompaksi offline. |
Parameter untuk kompaksi online
Parameter | Deskripsi | Nilai default | Catatan |
compaction.schedule.enabled | Menentukan apakah akan menghasilkan rencana kompaksi sesuai jadwal. | true | Nilai valid:
Catatan Kami sarankan Anda menyetel parameter ini ke true meskipun parameter compaction.async.enabled diatur ke false. |
compaction.async.enabled | Menentukan apakah kompaksi asinkron diaktifkan. | true | Nilai valid:
Catatan Anda dapat menyetel parameter ini ke false untuk menonaktifkan kompaksi online. Namun, kami sarankan Anda menyetel parameter compaction.schedule.enabled ke true. Dalam hal ini, Anda dapat melakukan kompaksi asinkron offline untuk menjalankan rencana kompaksi yang dihasilkan sesuai jadwal. |
compaction.tasks | Paralelisme tugas kompaksi. | 4 | Tidak tersedia |
compaction.trigger.strategy | Strategi yang digunakan untuk memicu kompaksi. | num_commits | Nilai valid:
|
compaction.delta_commits | Jumlah maksimum commit yang diperlukan untuk memicu kompaksi. | 5 | Tidak tersedia |
compaction.delta_seconds | Interval pemantik kompaksi. | 3600 | Satuan: detik. |
compaction.max_memory | Memori maksimum dari hash map yang digunakan untuk kompaksi dan deduplikasi. | 100 MB | Jika sumber daya cukup, kami sarankan Anda mengubah nilai parameter ini menjadi 1 GB. |
compaction.target_io | Throughput I/O maksimum untuk setiap rencana kompaksi. | 500 GB | Tidak tersedia |
Parameter terkait file
Parameter terkait file digunakan untuk mengelola ukuran file. Tabel berikut menjelaskan parameter yang didukung.
Parameter | Deskripsi | Nilai default | Catatan |
hoodie.parquet.max.file.size | Ukuran maksimum file Parquet tempat data dapat ditulis. Jika data yang akan ditulis ke file Parquet melebihi ukuran yang ditentukan oleh parameter ini, data tambahan ditulis ke grup file baru. | 120 * 1024 * 1024 byte (120 MB) | Satuan: byte. |
hoodie.parquet.small.file.limit | Ambang batas ukuran file kecil. File yang ukurannya kurang dari nilai parameter ini dianggap sebagai file kecil. | 104857600 byte (100 MB) |
|
hoodie.copyonwrite.record.size.estimate | Perkiraan ukuran rekaman. | 1024 byte (1 KB) |
|
Parameter terkait Hadoop
Parameter | Deskripsi | Nilai default | Catatan |
hadoop.${you option key} | Item konfigurasi Hadoop yang ditentukan menggunakan awalan hadoop. | Tidak ada nilai default | Anda dapat menentukan beberapa item konfigurasi Hadoop sekaligus. Catatan Parameter ini didukung di Hudi 0.12.0 dan versi lebih baru. Untuk memenuhi persyaratan commit lintas kluster dan eksekusi, Anda dapat menggunakan pernyataan DDL untuk menentukan konfigurasi Hadoop per-job. |
Parameter untuk penulisan data
Hudi mendukung berbagai metode penulisan, termasuk penulisan batch dan penulisan streaming. Hudi mendukung berbagai jenis data, termasuk data changelog dan data log. Hudi juga mendukung skema pengindeksan yang berbeda.
Parameter untuk Penulisan Batch
Jika Anda ingin mengimpor data yang ada dari sumber data lain ke Hudi, Anda dapat menggunakan fitur impor batch untuk mengimpor data yang ada ke tabel Hudi.
Parameter
Deskripsi
Nilai default
Catatan
write.operation
Jenis operasi penulisan.
upsert
Nilai valid:
upsert: Data dimasukkan dan diperbarui.
insert: Data dimasukkan.
bulk_insert: Data ditulis dalam batch.
CatatanJika Anda menyetel parameter ini ke bulk_insert, serialisasi data berbasis Avro dan kompaksi data tidak dilakukan. Deduplikasi tidak dilakukan setelah data diimpor. Jika Anda memiliki persyaratan tinggi pada keunikan data, pastikan data yang diimpor unik.
Anda dapat menyetel parameter write.operation ke bulk_insert hanya dalam mode eksekusi batch. Dalam mode ini, sistem mengurutkan data input berdasarkan nama partisi dan menulis data ke tabel Hudi secara default. Dengan cara ini, operasi penulisan tidak sering beralih di antara file yang berbeda dan performa sistem tidak menurun.
write.tasks
Paralelisme tugas bulk_insert tulis.
Paralelisme penyebaran Realtime Compute for Apache Flink
Paralelisme tugas bulk_insert tulis ditentukan oleh parameter write.tasks dan berdampak pada jumlah file kecil.
Secara teori, paralelisme tugas bulk_insert tulis sama dengan jumlah bucket. Jika data yang ditulis ke file di setiap bucket mencapai ukuran maksimum file, data bergulir ke handle file baru. Oleh karena itu, jumlah file akhir lebih besar atau sama dengan paralelisme tugas bulk_insert tulis. Ukuran maksimum file Parquet adalah 120 MB.
write.bulk_insert.shuffle_input
Menentukan apakah akan mengacak input data berdasarkan bidang partisi untuk tugas insert batch.
true
Di Hudi 0.11.0 dan versi lebih baru, Anda dapat menyetel parameter ini ke true untuk mengurangi jumlah file kecil. Namun, ini dapat menyebabkan skew data.
write.bulk_insert.sort_input
Menentukan apakah akan mengurutkan input data berdasarkan bidang partisi untuk tugas insert batch.
true
Di Hudi 0.11.0 dan versi lebih baru, jika Anda ingin menjalankan tugas penulisan untuk menulis data ke beberapa partisi, Anda dapat menyetel parameter ini ke true untuk mengurangi jumlah file kecil.
write.sort.memory
Memori terkelola yang tersedia untuk operator sortir.
128
Satuan: MB.
Parameter untuk Mengaktifkan Mode Changelog
Dalam mode changelog, Hudi mempertahankan semua perubahan (INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE) pesan dan kemudian bekerja sama dengan komputasi stateful dari mesin Flink untuk mengimplementasikan gudang data near-real-time ujung ke ujung. Tabel Merge on Read (MOR) Hudi mendukung penyimpanan semua perubahan pesan berdasarkan penyimpanan berorientasi baris. Flink yang sepenuhnya dikelola dapat membaca tabel MOR dalam mode streaming untuk mengonsumsi semua catatan perubahan.
CatatanDalam mode non-changelog, perubahan perantara dalam dataset batch dalam pembacaan streaming tunggal dapat digabungkan. Semua hasil langsung digabungkan dalam pembacaan batch (snapshot read). Status perantara diabaikan terlepas dari apakah itu ditulis.
Parameter
Deskripsi
Nilai default
Catatan
changelog.enabled
Menentukan apakah akan mengonsumsi semua perubahan.
false
Nilai valid:
true: Semua perubahan dapat dikonsumsi.
false: Tidak semua perubahan dapat dikonsumsi. Semantik UPSERT digunakan. Hanya pesan tergabung terakhir di antara semua pesan yang dijamin, dan perubahan perantara mungkin digabungkan.
CatatanSetelah Anda menyetel parameter changelog.enabled ke true, tugas kompaksi asinkron masih menggabungkan perubahan perantara menjadi satu rekaman data. Jika data tidak dikonsumsi pada kesempatan paling awal selama pembacaan streaming, hanya rekaman data terakhir yang dapat dibaca setelah data dikompaksi. Anda dapat mengubah waktu buffer untuk kompaksi data untuk menyediakan waktu tertentu bagi pembaca untuk membaca dan mengonsumsi data. Sebagai contoh, Anda dapat menyetel parameter compaction.delta_commits ke 5 dan parameter compaction.delta_seconds ke 3600.
Mode Append (didukung di Hudi 0.10.0 dan versi lebih baru)
Dalam Mode Append:
Kebijakan file kecil diterapkan pada tabel MOR. Data ditulis ke file log Avro dalam mode append.
Kebijakan file kecil tidak diterapkan pada tabel COW. File Parquet baru dihasilkan setiap kali data ditulis ke tabel Copy on Write (COW).
Parameter untuk kebijakan pengelompokan
Hudi mendukung berbagai kebijakan pengelompokan untuk menyelesaikan masalah file kecil dalam mode INSERT.
Parameter untuk Pengelompokan Inline (hanya didukung untuk tabel COW)
Parameter
Deskripsi
Nilai default
Catatan
write.insert.cluster
Menentukan apakah akan menggabungkan file kecil selama penulisan data.
false
Nilai valid:
true: File kecil digabungkan selama penulisan data.
false: File kecil tidak digabungkan selama penulisan data.
CatatanSecara default, file kecil tidak digabungkan ketika operasi INSERT dilakukan pada tabel COW. Jika Anda menyetel parameter ini ke true, file kecil yang ada digabungkan setiap kali operasi INSERT dilakukan. Namun, deduplikasi tidak dilakukan. Akibatnya, throughput penulisan berkurang.
Parameter untuk Pengelompokan Asinkron (didukung di Hudi 0.12.0 dan versi lebih baru)
Parameter
Deskripsi
Nilai default
Catatan
clustering.schedule.enabled
Menentukan apakah akan menjadwalkan rencana pengelompokan selama penulisan data.
false
Jika Anda menyetel parameter ini ke true, rencana pengelompokan dijadwalkan secara berkala.
clustering.delta_commits
Jumlah commit yang diperlukan untuk menghasilkan rencana pengelompokan.
4
Jika parameter clustering.schedule.enabled disetel ke true, parameter ini berlaku.
clustering.async.enabled
Menentukan apakah akan menjalankan rencana pengelompokan secara asinkron.
false
Jika Anda menyetel parameter ini ke true, rencana pengelompokan dijalankan secara asinkron pada interval reguler untuk menggabungkan file kecil.
clustering.tasks
Paralelisme tugas pengelompokan.
4
Tidak tersedia
clustering.plan.strategy.target.file.max.bytes
Ukuran maksimum file untuk pengelompokan.
1024 * 1024 * 1024
Satuan: byte.
clustering.plan.strategy.small.file.limit
Ambang batas ukuran file kecil yang digunakan untuk pengelompokan.
600
Hanya file yang ukurannya kurang dari nilai parameter ini yang dapat digunakan untuk pengelompokan.
clustering.plan.strategy.sort.columns
Kolom berdasarkan mana data diurutkan untuk pengelompokan.
Tidak ada nilai default
Anda dapat menentukan bidang pengurutan khusus.
Parameter untuk Strategi Rencana Pengelompokan
Parameter
Deskripsi
Nilai default
Catatan
clustering.plan.partition.filter.mode
Mode filter partisi yang digunakan dalam pembuatan rencana pengelompokan.
NONE
Nilai valid:
NONE: Partisi tidak difilter. Semua partisi dipilih untuk pengelompokan.
RECENT_DAYS: Jika data dipartisi berdasarkan hari, partisi yang sesuai dengan jumlah hari terbaru yang ditentukan dipilih untuk pengelompokan.
SELECTED_PARTITIONS: Partisi yang ditentukan dipilih untuk pengelompokan.
clustering.plan.strategy.daybased.lookback.partitions
Jumlah hari terbaru berdasarkan mana partisi dipilih untuk pengelompokan ketika parameter clustering.plan.partition.filter.mode disetel ke RECENT_DAYS.
2
Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke RECENT_DAYS.
clustering.plan.strategy.cluster.begin.partition
Partisi awal, yang digunakan untuk memfilter partisi.
Tidak ada nilai default
Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke SELECTED_PARTITIONS.
clustering.plan.strategy.cluster.end.partition
Partisi akhir, yang digunakan untuk memfilter partisi.
Tidak ada nilai default
Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke SELECTED_PARTITIONS.
clustering.plan.strategy.partition.regex.pattern
Ekspresi reguler yang digunakan untuk menentukan partisi.
Tidak ada nilai default
Tidak tersedia
clustering.plan.strategy.partition.selected
Partisi yang dipilih.
Tidak ada nilai default
Pisahkan beberapa partisi dengan koma (,).
Parameter Terkait Jenis Indeks Bucket
CatatanParameter dalam tabel berikut didukung di Hudi 0.11.0 dan versi lebih baru.
Parameter
Deskripsi
Nilai default
Catatan
index.type
Jenis indeks.
FLINK_STATE
Nilai valid:
FLINK_STATE: Jenis indeks state Flink digunakan.
BUCKET: Jenis indeks bucket digunakan.
Jika Anda mengonfigurasi
index.type=bucket, menyetel parameterindex.global.enabledke true tidak valid karena jenis indeks bucket tidak mendukung perubahan lintas partisi. Dalam hal ini, deduplikasi pada beberapa partisi tidak dapat dilakukan meskipun fitur indeks global diaktifkan.
Jika jumlah data besar seperti lebih dari 500 juta rekaman data dalam sebuah tabel, overhead penyimpanan state Flink dapat menjadi hambatan. Jenis indeks bucket menggunakan kebijakan hash tetap untuk mengalokasikan data yang mengandung kunci yang sama ke grup file yang sama. Ini membantu mencegah overhead penyimpanan dan query indeks. Jenis indeks bucket dan jenis indeks state Flink memiliki perbedaan berikut:
Dibandingkan dengan jenis indeks state Flink, jenis indeks bucket tidak memiliki overhead penyimpanan dan komputasi. Jenis indeks bucket memberikan performa lebih baik daripada jenis indeks state Flink.
Jika Anda menggunakan jenis indeks bucket, Anda tidak dapat meningkatkan jumlah bucket. Jika Anda menggunakan jenis indeks state Flink, Anda dapat secara dinamis meningkatkan jumlah file berdasarkan ukuran file.
Jenis indeks bucket tidak mendukung perubahan lintas partisi. Setelah data ditulis ke partisi, indeks bucket tidak berubah. Sebagai contoh, operasi penghapusan hanya memengaruhi data di partisi saat ini dan tidak memengaruhi data yang sama di partisi lain.
CatatanJika data input adalah data streaming CDC, batasan ini tidak berlaku.
hoodie.bucket.index.hash.field
Bidang kunci hash ketika jenis indeks bucket digunakan.
Kunci utama
Parameter ini dapat disetel ke subset dari kunci utama.
hoodie.bucket.index.num.buckets
Jumlah bucket ketika jenis indeks bucket digunakan.
4
Secara default, nilai parameter ini adalah jumlah bucket di setiap partisi. Anda tidak dapat mengubah nilai parameter ini setelah konfigurasi.
Parameter untuk pembacaan data
Hudi mendukung berbagai metode pembacaan, termasuk pembacaan batch, pembacaan streaming, dan pembacaan data inkremental. Hudi juga dapat mengonsumsi dan mentransfer changelog untuk mengimplementasikan ETL inkremental ujung ke ujung.
Parameter untuk Pembacaan Streaming
Secara default, pembacaan snapshot digunakan untuk tabel. Dalam hal ini, snapshot penuh terbaru dibaca dan dikembalikan sekaligus. Anda dapat menyetel parameter read.streaming.enabled ke true untuk mengaktifkan pembacaan streaming. Anda dapat mengonfigurasi parameter read.start-commit untuk menentukan offset awal untuk pembacaan streaming. Anda dapat menyetel parameter read.start-commit ke earliest untuk memungkinkan data dikonsumsi dari offset paling awal.
Parameter
Deskripsi
Nilai default
Catatan
read.streaming.enabled
Menentukan apakah pembacaan streaming diaktifkan.
false
Nilai valid:
true: Pembacaan streaming diaktifkan.
false: Pembacaan streaming dinonaktifkan.
read.start-commit
Offset awal untuk pembacaan streaming.
Dibiarkan kosong
Nilai valid:
Waktu dalam format yyyyMMddHHmmss: Data dikonsumsi dari waktu yang ditentukan.
earliest: Data dikonsumsi dari offset paling awal.
Jika parameter ini dibiarkan kosong, data dikonsumsi dari waktu terbaru.
clean.retain_commits
Jumlah maksimum commit historis yang dapat dipertahankan oleh cleaner.
30
Jika jumlah commit historis melebihi nilai parameter ini, commit historis yang berlebih dihapus. Dalam mode changelog, parameter ini dapat digunakan untuk mengontrol periode retensi changelog. Sebagai contoh, jika periode checkpointing adalah 5 menit, changelog dipertahankan setidaknya selama 150 menit secara default.
PentingPembacaan streaming changelog didukung hanya di Hudi 0.10.0 dan versi lebih baru. Dalam mode changelog, Hudi mempertahankan changelog selama periode tertentu untuk konsumen hilir mengonsumsinya.
Changelog mungkin digabungkan dalam tugas kompaksi. Dalam hal ini, catatan perantara dihapus, yang dapat memengaruhi hasil perhitungan.
Parameter untuk Pembacaan Data Inkremental (didukung di Hudi 0.10.0 dan versi lebih baru)
Flink yang sepenuhnya dikelola mendukung konsumsi inkremental menggunakan konektor DataStream, konsumsi batch inkremental, dan konsumsi batch data pada titik waktu tertentu menggunakan fitur time travel.
Parameter
Deskripsi
Nilai default
Catatan
read.start-commit
Offset dari mana konsumsi data dimulai.
Commit dari offset terbaru
Nilai parameter ini dalam format yyyyMMddHHmmss.
Intervalnya adalah interval tertutup, yang mencakup offset awal dan akhir.
read.end-commit
Offset di mana konsumsi data berakhir.
Commit dari offset terbaru
Kode contoh
Kode Contoh untuk Tabel Sumber
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- Baca data dari commit terbaru dalam mode streaming dan tulis data ke Blackhole.
INSERT INTO blackhole SELECT * from hudi_tbl;Kode Contoh untuk Tabel Sink
CREATE TEMPORARY TABLE datagen(
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen' ,
'rows-per-second'='100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * from datagen;API DataStream
Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang cara mengonfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.
Maven POM
Tentukan versi Realtime Compute for Apache Flink dan Hudi berdasarkan versi VVR yang digunakan.
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.4</flink.version> <hudi.version>0.13.1</hudi.version> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- hudi --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> <scope>provided</scope> </dependency> <!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.1</version> <scope>provided</scope> </dependency> </dependencies>PentingDependensi spesifik yang digunakan oleh DLF bertentangan dengan versi Hive open source, seperti
hive-commondanhive-exec. Jika Anda ingin menguji DLF di lingkungan lokal Anda, Anda dapat mengunduh paket JAR hive-common dan hive-exec, dan secara manual mengimpor paket tersebut di IntelliJ IDEA.Tulis Data ke Hudi
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkHudiQuickStart { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dbName = "test_db"; String tableName = "test_tbl"; String basePath = "oss://xxx"; Map<String, String> options = new HashMap<>(); // konfigurasi hudi options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.DATABASE_NAME.key(), dbName); options.put(FlinkOptions.TABLE_NAME.key(), tableName); // konfigurasi oss options.put("hadoop.fs.oss.accessKeyId", "xxx"); options.put("hadoop.fs.oss.accessKeySecret", "xxx"); // Gunakan endpoint publik untuk debugging lokal, seperti oss-cn-hangzhou.aliyuncs.com. Gunakan endpoint internal untuk pengiriman kluster, seperti oss-cn-hangzhou-internal.aliyuncs.com. options.put("hadoop.fs.oss.endpoint", "xxx"); options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS"); options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); // konfigurasi dlf options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // Anda dapat menentukan apakah akan menyinkronkan data Hudi ke DLF. options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName); options.put("hadoop.dlf.catalog.id", "xxx"); options.put("hadoop.dlf.catalog.accessKeyId", "xxx"); options.put("hadoop.dlf.catalog.accessKeySecret", "xxx"); options.put("hadoop.dlf.catalog.region", "xxx"); // Gunakan endpoint publik untuk debugging lokal, seperti dlf.cn-hangzhou.aliyuncs.com. Gunakan endpoint internal untuk pengiriman kluster, seperti dlf-vpc.cn-hangzhou.aliyuncs.com. options.put("hadoop.dlf.catalog.endpoint", "xxx"); options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory"); DataStream<RowData> dataStream = env.fromElements( GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22, StringData.fromString("1001"), StringData.fromString("p1")), GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32, StringData.fromString("1002"), StringData.fromString("p2")) ); HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName) .column("uuid string") .column("name string") .column("age int") .column("ts string") .column("`partition` string") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // Parameter kedua menunjukkan apakah aliran data input dibatasi env.execute("Flink_Hudi_Quick_Start"); } }