Konektor Hudi bawaan tidak lagi didukung di versi mendatang Ververica Runtime (VVR). Gunakan konektor kustom untuk menghubungkan Realtime Compute for Apache Flink ke Apache Hudi, atau migrasikan ke konektor Paimon guna memperoleh fitur dan performa yang lebih optimal.
Apache Hudi adalah framework data lake open-source yang mengelola data tabel yang disimpan di Object Storage Service (OSS) atau Hadoop Distributed File System (HDFS). Framework ini menyediakan jaminan ACID (Atomicity, Consistency, Isolation, Durability), operasi upsert dan penghapusan tingkat baris (row-level), manajemen file kecil otomatis, serta kueri time travel.
Fitur inti
| Feature | Description |
|---|---|
| Semantik ACID | Isolasi snapshot secara default, memastikan konsistensi data pada pembacaan dan penulisan konkuren. |
| Semantik UPSERT | Menggabungkan INSERT dan UPDATE: jika catatan belum ada, maka dimasukkan; jika sudah ada, maka diperbarui. Hal ini menyederhanakan kode pengembangan ETL. |
| Time travel | Mengakses versi data historis pada titik waktu tertentu, memungkinkan auditing data dan kontrol kualitas yang efisien. |
Skenario khas
| Scenario | Description |
|---|---|
| Akselerasi ingestion database | Menulis data Change Data Capture (CDC) (misalnya, log biner MySQL melalui konektor MySQL CDC) langsung ke tabel Hudi untuk ETL real-time downstream—lebih hemat biaya dibandingkan pemuatan bulk offline. |
| ETL inkremental | Mengekstraksi aliran data perubahan dari Hudi secara inkremental untuk ETL real-time ringan. Gunakan Apache Presto atau Apache Spark untuk OLAP downstream. |
| Antrian pesan | Gunakan Hudi sebagai pengganti antrian pesan ringan untuk skenario volume rendah, menyederhanakan arsitektur aplikasi. |
| Pengisian ulang data (data backfilling) | Gabungkan data lengkap dan data inkremental dari tabel Hudi di Hive metastore untuk menghasilkan tabel lebar dengan overhead komputasi minimal. |
Keunggulan dibandingkan Hudi open-source
-
Tanpa perawatan: Konektor Hudi bawaan mengurangi kompleksitas O&M dan memberikan jaminan SLA.
-
Konektivitas data yang lebih baik: Memisahkan data dari mesin komputasi, memungkinkan migrasi mulus antara Apache Flink, Apache Spark, Apache Presto, dan Apache Hive.
-
Ingestion database-to-lake yang disederhanakan: Bekerja sama dengan konektor Flink CDC untuk menyederhanakan pengembangan data.
-
Fitur kelas enterprise: Manajemen metadata terpadu melalui Data Lake Formation (DLF) dan perubahan skema ringan otomatis.
-
Penyimpanan hemat biaya: Data disimpan dalam format Apache Parquet atau Apache Avro di Alibaba Cloud OSS, dengan isolasi penyimpanan dan komputasi untuk penskalaan sumber daya yang fleksibel.
Konfigurasi yang didukung
| Item | Value |
|---|---|
| Jenis tabel | Tabel sumber, tabel sink |
| Mode eksekusi | Mode streaming, mode batch |
| Format data | N/A |
| Jenis API | DataStream API, SQL API |
| Pembaruan/penghapusan data di sink | Didukung |
| Versi VVR minimum | vvr-4.0.11-flink-1.13 |
| Sistem file yang didukung | OSS, HDFS, OSS-HDFS |
Metrik
| Jenis tabel | Metrik |
|---|---|
| Tabel sumber | numRecordsIn, numRecordsInPerSecond |
| Tabel sink | numRecordsOut, numRecordsOutPerSecond, currentSendTime |
Untuk definisi metrik, lihat Metrik.
Batasan
-
Versi mesin minimum: vvr-4.0.11-flink-1.13 atau lebih baru.
-
Sistem file yang didukung: hanya OSS, HDFS, atau OSS-HDFS.
-
Pekerjaan draft tidak dapat dijalankan pada kluster sesi.
-
Modifikasi bidang tidak didukung melalui konektor Hudi. Untuk memodifikasi bidang, gunakan pernyataan Spark SQL di konsol Data Lake Formation (DLF).
Sintaksis
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
...
);
Parameter dalam klausa WITH
Parameter dasar
Parameter umum
| Parameter | Wajib | Default | Description |
|---|---|---|---|
connector |
Ya | — | Atur ke hudi. |
path |
Ya | — | Jalur penyimpanan tabel. Format yang didukung: OSS (oss://<bucket>/<user-defined-dir>), HDFS (hdfs://<user-defined-dir>), OSS-HDFS (oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>). Jalur OSS-HDFS memerlukan VVR 8.0.3 atau lebih baru. Temukan endpoint OSS-HDFS di bagian Port halaman Overview bucket OSS. |
hoodie.datasource.write.recordkey.field |
Tidak | uuid |
Bidang kunci primer. Pisahkan beberapa bidang dengan koma. Atau, gunakan sintaksis PRIMARY KEY dalam DDL. |
precombine.field |
Tidak | ts |
Bidang versi yang digunakan untuk menentukan urutan pembaruan. Jika tidak diatur, pembaruan mengikuti urutan pesan yang ditentukan oleh mesin. |
oss.endpoint |
Tidak | — | Diperlukan saat menyimpan data di OSS atau OSS-HDFS. Untuk endpoint OSS, lihat Wilayah dan endpoint. Untuk endpoint OSS-HDFS, lihat bagian Port halaman Overview bucket OSS. |
accessKeyId |
Tidak | — | ID AccessKey. Diperlukan untuk OSS dan OSS-HDFS. Simpan kredensial sebagai variabel, bukan hardcoding. Lihat Kelola variabel. |
accessKeySecret |
Tidak | — | Rahasia AccessKey. Diperlukan untuk OSS dan OSS-HDFS. |
Untuk melindungi pasangan AccessKey Anda, simpan ID AccessKey dan rahasia AccessKey sebagai variabel. Lihat Kelola variabel.
Parameter tabel sumber
| Parameter | Wajib | Default | Description |
|---|---|---|---|
read.streaming.enabled |
Tidak | false |
Atur ke true untuk mengaktifkan pembacaan streaming. Secara default, digunakan pembacaan snapshot, yang mengembalikan snapshot lengkap terbaru. |
read.start-commit |
Tidak | (kosong) | Offset awal untuk pembacaan streaming. Format: yyyyMMddHHmmss untuk waktu tertentu, atau earliest untuk membaca dari awal. Biarkan kosong untuk membaca dari commit terbaru. |
Sink Table Parameters
| Parameter | Wajib | Default | Description |
|---|---|---|---|
write.operation |
Tidak | UPSERT |
Mode penulisan. Nilai yang valid: insert (append), upsert (insert atau update), bulk_insert (append batch). |
hive_sync.enable |
Tidak | false |
Atur ke true untuk menyinkronkan metadata ke Apache Hive. |
hive_sync.mode |
Tidak | hms |
Mode sinkronisasi. hms menyinkronkan ke Hive metastore atau DLF. jdbc menyinkronkan melalui driver Java Database Connectivity (JDBC). |
hive_sync.db |
Tidak | default |
Nama database Hive tujuan. |
hive_sync.table |
Tidak | Nama tabel saat ini | Nama tabel Hive tujuan. Tidak boleh mengandung tanda hubung (-). |
dlf.catalog.region |
Tidak | — | Wilayah tempat DLF diaktifkan. Hanya berlaku saat hive_sync.mode adalah hms. Lihat Wilayah dan endpoint yang didukung. Harus sesuai dengan wilayah yang ditentukan oleh dlf.catalog.endpoint. |
dlf.catalog.endpoint |
Tidak | — | Endpoint DLF. Hanya berlaku saat hive_sync.mode adalah hms. Gunakan endpoint VPC untuk latensi lebih rendah—misalnya, dlf-vpc.cn-hangzhou.aliyuncs.com untuk wilayah China (Hangzhou). Lihat Wilayah dan endpoint yang didukung. Untuk akses lintas-VPC, lihat Bagaimana Realtime Compute for Apache Flink mengakses layanan lintas VPC? |
Parameter lanjutan
Parameter paralelisme
| Parameter | Default | Description |
|---|---|---|
write.tasks |
4 |
Paralelisme tugas penulisan. Setiap tugas menulis ke 1 atau lebih bucket secara berurutan. Meningkatkan nilai ini tidak meningkatkan jumlah file kecil. |
write.bucket_assign.tasks |
Paralelisme penyebaran | Paralelisme operator penugasan bucket. Meningkatkan nilai ini meningkatkan jumlah file kecil. |
write.index_bootstrap.tasks |
Paralelisme penyebaran | Paralelisme operator bootstrap indeks. Hanya berlaku saat index.bootstrap.enabled adalah true. Meningkatkan nilai ini meningkatkan throughput bootstrap, tetapi checkpointing mungkin diblokir selama bootstrap—tingkatkan toleransi kegagalan checkpoint jika diperlukan. |
read.tasks |
4 |
Paralelisme operator pembacaan streaming dan batch. |
compaction.tasks |
4 |
Paralelisme operator kompaksi online. Kompaksi online mengonsumsi lebih banyak sumber daya daripada kompaksi offline; lebih disarankan menggunakan kompaksi offline untuk workload produksi. |
Parameter kompaksi online
| Parameter | Default | Description |
|---|---|---|
compaction.schedule.enabled |
true |
Apakah akan menghasilkan rencana kompaksi sesuai jadwal. Pertahankan nilai ini true bahkan saat kompaksi asinkron dinonaktifkan—kompaksi offline kemudian dapat menjalankan rencana yang telah dijadwalkan. |
compaction.async.enabled |
true |
Apakah akan menjalankan kompaksi secara asinkron. Atur ke false untuk menonaktifkan kompaksi online sambil tetap mengaktifkan pembuatan rencana. |
compaction.tasks |
4 |
Paralelisme tugas kompaksi. |
compaction.trigger.strategy |
num_commits |
Strategi yang digunakan untuk memicu kompaksi. Nilai yang valid: num_commits, time_elapsed, num_and_time, num_or_time. |
compaction.delta_commits |
5 |
Jumlah commit yang diperlukan untuk memicu kompaksi. Digunakan bersama num_commits, num_and_time, atau num_or_time. |
compaction.delta_seconds |
3600 |
Interval dalam detik antara pemicu kompaksi. Digunakan bersama time_elapsed, num_and_time, atau num_or_time. |
compaction.max_memory |
100 MB |
Memori maksimum untuk hash map yang digunakan selama kompaksi dan deduplikasi. Tingkatkan hingga 1 GB jika sumber daya memungkinkan. |
compaction.target_io |
500 GB |
Throughput I/O maksimum per rencana kompaksi. |
Parameter ukuran file
Parameter ini mengontrol cara Hudi mengelola ukuran file untuk mencegah akumulasi file kecil.
| Parameter | Default | Description |
|---|---|---|
hoodie.parquet.max.file.size |
120 MB (120 × 1024 × 1024 byte) | Ukuran maksimum file Parquet. Data yang melebihi ambang batas ini ditulis ke grup file baru. |
hoodie.parquet.small.file.limit |
100 MB (104.857.600 byte) | File yang lebih kecil dari ambang batas ini dianggap sebagai file kecil. Selama penulisan, Hudi menambahkan data ke file kecil yang sudah ada, bukan membuat file baru. |
hoodie.copyonwrite.record.size.estimate |
1 KB (1.024 byte) | Perkiraan ukuran catatan. Jika tidak diatur, Hudi menghitungnya secara dinamis dari metadata yang telah dikomit. |
Parameter konfigurasi Hadoop
| Parameter | Default | Description |
|---|---|---|
hadoop.${option key} |
— | Item konfigurasi Hadoop, ditentukan dengan awalan hadoop.. Didukung di Hudi 0.12.0 dan versi lebih baru. Gunakan pernyataan DDL untuk menentukan konfigurasi Hadoop per pekerjaan dalam skenario lintas kluster. Beberapa item dapat ditentukan sekaligus. |
Parameter penulisan data
Penulisan batch
Gunakan penulisan batch untuk mengimpor data yang sudah ada dari sumber lain ke tabel Hudi.
bulk_insertmelewati serialisasi Avro, kompaksi, dan deduplikasi. Pastikan keunikan data sumber sebelum menggunakan mode ini.bulk_inserthanya berlaku dalam mode eksekusi batch.
| Parameter | Default | Description |
|---|---|---|
write.operation |
upsert |
Jenis penulisan. Atur ke bulk_insert untuk penulisan batch. |
write.tasks |
Paralelisme penyebaran | Paralelisme untuk tugas bulk_insert. Jumlah akhir file output lebih besar dari atau sama dengan nilai ini (data bergulir ke file baru saat mencapai batas Parquet 120 MB). |
write.bulk_insert.shuffle_input |
true |
Apakah akan melakukan shuffle data masukan berdasarkan bidang partisi sebelum menulis. Tersedia di Hudi 0.11.0 dan versi lebih baru. Mengurangi jumlah file kecil tetapi dapat menyebabkan kesenjangan data. |
write.bulk_insert.sort_input |
true |
Apakah akan mengurutkan data masukan berdasarkan bidang partisi sebelum menulis. Tersedia di Hudi 0.11.0 dan versi lebih baru. Mengurangi jumlah file kecil saat satu tugas menulis ke beberapa partisi. |
write.sort.memory |
128 |
Memori terkelola yang tersedia untuk operator pengurutan, dalam MB. |
Changelog Mode
Dalam mode changelog, Hudi menyimpan semua event perubahan—INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE—memungkinkan warehouse data near-real-time end-to-end dengan komputasi stateful Flink. Tabel Merge On Read (MOR) mendukung mode ini.
Dalam mode non-changelog, perubahan dalam satu batch digabungkan. Pembacaan snapshot hanya mengembalikan hasil akhir yang telah digabungkan; status antara tidak terlihat terlepas dari jalur penulisan.
Setelah mengaktifkan mode changelog, tugas kompaksi asinkron tetap menggabungkan perubahan antara. Atur compaction.delta_commits=5 dan compaction.delta_seconds=3600 agar konsumen downstream memiliki cukup waktu untuk membaca catatan sebelum dikompaksi.
| Parameter | Default | Description |
|---|---|---|
changelog.enabled |
false |
Atur ke true untuk menyimpan semua event perubahan. Saat false, hanya catatan akhir yang telah digabungkan yang dijamin; perubahan antara mungkin digabungkan. |
Mode Penambahan
Didukung di Hudi 0.10.0 dan versi lebih baru.
-
Tabel MOR: Kebijakan file kecil berlaku. Data ditulis ke file log Apache Avro dalam mode append.
-
Tabel Copy On Write (COW): Kebijakan file kecil tidak berlaku. File Apache Parquet baru dibuat untuk setiap penulisan.
Parameter clustering
Hudi mendukung clustering untuk mengatasi akumulasi file kecil dalam mode INSERT.
Clustering inline (hanya untuk tabel COW)
| Parameter | Default | Description |
|---|---|---|
write.insert.cluster |
false |
Atur ke true untuk menggabungkan file kecil selama penulisan. Setiap operasi INSERT menggabungkan file kecil yang sudah ada, tetapi deduplikasi tidak dilakukan, dan throughput penulisan menurun. |
Clustering asinkron (Hudi 0.12.0 dan versi lebih baru)
| Parameter | Default | Description |
|---|---|---|
clustering.schedule.enabled |
false |
Atur ke true untuk menjadwalkan rencana clustering secara berkala. |
clustering.delta_commits |
4 |
Jumlah commit yang diperlukan untuk menghasilkan rencana clustering. Hanya berlaku saat clustering.schedule.enabled adalah true. |
clustering.async.enabled |
false |
Atur ke true untuk menjalankan rencana clustering secara asinkron dalam interval reguler. |
clustering.tasks |
4 |
Paralelisme tugas clustering. |
clustering.plan.strategy.target.file.max.bytes |
1 GiB (1.073.741.824 byte) | Ukuran file maksimum target untuk output clustering. |
clustering.plan.strategy.small.file.limit |
600 |
File yang lebih kecil dari ambang batas ini (dalam byte) memenuhi syarat untuk clustering. |
clustering.plan.strategy.sort.columns |
— | Kolom yang digunakan untuk mengurutkan data selama clustering. |
Strategi Perencanaan Pengelompokan
| Parameter | Default | Description |
|---|---|---|
clustering.plan.partition.filter.mode |
NONE |
Mode filter partisi. Nilai yang valid: NONE (semua partisi), RECENT_DAYS (partisi dari N hari terakhir), SELECTED_PARTITIONS (partisi tertentu). |
clustering.plan.strategy.daybased.lookback.partitions |
2 |
Jumlah hari terakhir untuk memilih partisi untuk clustering. Hanya berlaku saat filter.mode adalah RECENT_DAYS. |
clustering.plan.strategy.cluster.begin.partition |
— | Partisi awal untuk filter rentang. Hanya berlaku saat filter.mode adalah SELECTED_PARTITIONS. |
clustering.plan.strategy.cluster.end.partition |
— | Partisi akhir untuk filter rentang. Hanya berlaku saat filter.mode adalah SELECTED_PARTITIONS. |
clustering.plan.strategy.partition.regex.pattern |
— | Ekspresi reguler untuk memilih partisi. |
clustering.plan.strategy.partition.selected |
— | Daftar partisi yang dipilih, dipisahkan koma. |
Pilih jenis indeks
Hudi mendukung dua jenis indeks. Gunakan tabel berikut untuk memilih yang sesuai dengan workload Anda.
| Dimensi | FLINK_STATE | BUCKET |
|---|---|---|
| Overhead penyimpanan/komputasi | Ya (backend status) | Tidak ada |
| Performa | Tergantung backend status | Lebih baik (tanpa overhead status) |
| Fleksibilitas grup file | Menetapkan catatan secara dinamis berdasarkan ukuran file | Jumlah bucket tetap (tidak dapat ditambah setelah konfigurasi awal) |
| Perubahan lintas partisi | Didukung | Tidak didukung (pengecualian: input streaming Change Data Capture (CDC)) |
| Kapan digunakan | Tabel dengan kurang dari 500 juta catatan, atau workload yang memerlukan pembaruan lintas partisi | Tabel dengan lebih dari 500 juta catatan di mana overhead status menjadi bottleneck |
Saatindex.typediatur keBUCKET, pengaturanindex.global.enabled=truetidak berpengaruh—indeks bucket tidak mendukung deduplikasi lintas partisi.
| Parameter | Default | Description |
|---|---|---|
index.type |
FLINK_STATE |
Jenis indeks. Nilai yang valid: FLINK_STATE, BUCKET. |
hoodie.bucket.index.hash.field |
Kunci primer | Bidang kunci hash untuk indeks bucket. Dapat berupa subset dari kunci primer. |
hoodie.bucket.index.num.buckets |
4 |
Jumlah bucket per partisi. Tidak dapat diubah setelah tabel dibuat. |
Parameter indeks bucket didukung di Hudi 0.11.0 dan versi lebih baru.
Parameter pembacaan data
Hudi mendukung tiga pola pembacaan menggunakan parameter yang sama.
| Pattern | Configuration |
|---|---|
| Pembacaan streaming | Atur read.streaming.enabled=true dan opsional read.start-commit |
| Pembacaan batch inkremental | Atur kedua read.start-commit dan read.end-commit; intervalnya tertutup (inklusif di kedua ujung) |
| Time travel | Atur hanya read.end-commit; membaca snapshot pada commit spesifik tersebut |
Parameter pembacaan streaming
Secara default, pembacaan tabel Hudi menggunakan pembacaan snapshot—snapshot lengkap terbaru dikembalikan sekaligus. Atur read.streaming.enabled=true untuk beralih ke pembacaan streaming.
| Parameter | Default | Description |
|---|---|---|
read.streaming.enabled |
false |
Atur ke true untuk mengaktifkan pembacaan streaming. |
read.start-commit |
(kosong) | Offset awal. Format: yyyyMMddHHmmss untuk waktu tertentu, atau earliest untuk membaca dari awal. Biarkan kosong untuk memulai dari commit terbaru. |
clean.retain_commits |
30 |
Jumlah maksimum commit historis yang disimpan oleh cleaner. Commit yang melebihi batas ini akan dihapus. Misalnya, dengan interval checkpointing 5 menit, nilai default 30 menyimpan changelog setidaknya selama 150 menit. |
Pembacaan streaming changelog memerlukan Hudi 0.10.0 atau versi lebih baru. Tugas kompaksi dapat menggabungkan changelog, menghapus catatan antara, dan berpotensi memengaruhi perhitungan downstream.
Parameter pembacaan inkremental
| Parameter | Default | Description |
|---|---|---|
read.start-commit |
Commit terbaru | Awal rentang pembacaan, dalam format yyyyMMddHHmmss. |
read.end-commit |
Commit terbaru | Akhir rentang pembacaan, dalam format yyyyMMddHHmmss. Rentangnya tertutup (inklusif di kedua ujung). |
Contoh
Tabel sumber
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- Baca dari commit terbaru dalam mode streaming dan tulis ke Blackhole.
INSERT INTO blackhole SELECT * FROM hudi_tbl;
Tabel sink
CREATE TEMPORARY TABLE datagen (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * FROM datagen;
DataStream API
Untuk menggunakan DataStream API, konfigurasikan konektor DataStream untuk Realtime Compute for Apache Flink. Lihat Pengaturan konektor DataStream.
Dependensi Maven
Sesuaikan versi dependensi dengan versi VVR Anda.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.4</flink.version>
<hudi.version>0.13.1</hudi.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<!-- OSS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<!-- DLF -->
<dependency>
<groupId>com.aliyun.datalake</groupId>
<artifactId>metastore-client-hive2</artifactId>
<version>0.2.14</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.5.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
Dependensi DLF bentrok dengan versi Apache Hive open-source (hive-common, hive-exec). Untuk pengujian DLF lokal, unduh paket JAR kustom hive-common dan hive-exec, lalu impor secara manual di IntelliJ IDEA.
Menulis data ke Hudi
Contoh berikut menulis data ke tabel Hudi MOR di OSS dan secara opsional menyinkronkan metadata ke DLF.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
public class FlinkHudiQuickStart {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String dbName = "test_db";
String tableName = "test_tbl";
String basePath = "oss://xxx";
Map<String, String> options = new HashMap<>();
// Konfigurasi Hudi
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
options.put(FlinkOptions.TABLE_NAME.key(), tableName);
// Konfigurasi OSS
// Gunakan endpoint publik untuk debugging lokal (misalnya, oss-cn-hangzhou.aliyuncs.com)
// Gunakan endpoint internal untuk pengiriman kluster (misalnya, oss-cn-hangzhou-internal.aliyuncs.com)
options.put("hadoop.fs.oss.accessKeyId", "xxx");
options.put("hadoop.fs.oss.accessKeySecret", "xxx");
options.put("hadoop.fs.oss.endpoint", "xxx");
options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
// Konfigurasi DLF (opsional — hapus jika tidak menyinkronkan ke DLF)
// Gunakan endpoint publik untuk debugging lokal (misalnya, dlf.cn-hangzhou.aliyuncs.com)
// Gunakan endpoint VPC untuk pengiriman kluster (misalnya, dlf-vpc.cn-hangzhou.aliyuncs.com)
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
options.put("hadoop.dlf.catalog.id", "xxx");
options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
options.put("hadoop.dlf.catalog.region", "xxx");
options.put("hadoop.dlf.catalog.endpoint", "xxx");
options.put("hadoop.hive.imetastoreclient.factory.class",
"com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
DataStream<RowData> dataStream = env.fromElements(
GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
StringData.fromString("1001"), StringData.fromString("p1")),
GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
StringData.fromString("1002"), StringData.fromString("p2"))
);
HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
.column("uuid string")
.column("name string")
.column("age int")
.column("ts string")
.column("`partition` string")
.pk("uuid")
.partition("partition")
.options(options);
// Parameter kedua: apakah aliran input dibatasi (true = batch, false = streaming)
builder.sink(dataStream, false);
env.execute("Flink_Hudi_Quick_Start");
}
}