全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Hudi (akan dipensiunkan)

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan konektor Hudi.

Penting

Karena perkembangan ekologi pasar dan penyesuaian strategi layanan, konektor Hudi bawaan resmi tidak lagi didukung di versi Ververica Runtime (VVR) mendatang dari Realtime Compute for Apache Flink. Anda dapat menggunakan konektor kustom untuk menghubungkan Realtime Compute for Apache Flink ke Apache Hudi. Selain itu, kami menyarankan Anda melakukan migrasi data dan menggunakan konektor Paimon untuk mendapatkan fitur dan performa yang dioptimalkan.

Informasi latar belakang

Apache Hudi adalah kerangka kerja sumber terbuka yang mengelola data tabel dalam danau data. Apache Hudi mengatur tata letak file berdasarkan Alibaba Cloud Object Storage Service (OSS) atau Hadoop Distributed File System (HDFS) untuk memastikan atomicity, consistency, isolation, durability (ACID) dan mendukung pembaruan serta penghapusan data tingkat baris secara efisien. Hal ini menyederhanakan pengembangan extract, transform, dan load (ETL). Apache Hudi juga mendukung pengelolaan otomatis dan penggabungan file kecil untuk menjaga ukuran file tetap sesuai. Ini membantu mencegah penurunan performa query akibat file kecil yang berlebihan selama proses penyisipan dan pembaruan data serta menghilangkan beban operasional pemantauan dan penggabungan file kecil secara manual.

Item

Deskripsi

Jenis tabel

Tabel sumber dan tabel hasil

Mode operasi

Mode streaming dan mode batch

Format data

Tidak tersedia

Metrik

  • Data deret waktu untuk tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

  • Data deret waktu untuk tabel hasil:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu.

Jenis API

API DataStream dan API SQL

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Fitur

Item

Deskripsi

Fitur inti

  • Mendukung semantik ACID. Secara default, isolasi snapshot disediakan.

  • Mendukung semantik UPSERT. Semantik UPSERT adalah kombinasi dari semantik INSERT dan UPDATE. Flink yang sepenuhnya dikelola menggunakan semantik UPSERT untuk menulis data ke tabel berdasarkan aturan berikut: Jika catatan data yang akan ditulis ke tabel tidak ada di tabel, Flink yang sepenuhnya dikelola menyisipkan catatan data ke tabel. Jika catatan data sudah ada di tabel, Flink yang sepenuhnya dikelola memperbarui catatan data. Pernyataan INSERT INTO dapat secara signifikan menyederhanakan kode pengembangan dan meningkatkan efisiensi pemrosesan data.

  • Menyediakan detail historis versi data pada titik waktu tertentu berdasarkan fitur time travel. Ini membantu Anda melakukan O&M data secara efisien dan meningkatkan kualitas data.

Skenario tipikal

  • Percepatan pemasukan data dari database ke danau data

    Dibandingkan dengan metode tradisional yang digunakan untuk memuat dan menggabungkan sejumlah besar data dalam mode offline, Hudi memungkinkan Anda memperbarui dan menulis data streaming ke dataset yang sangat besar secara real-time dengan cara yang lebih hemat biaya. Anda dapat langsung menulis data Change Data Capture (CDC) ke danau data untuk bisnis hilir dalam proses ETL real-time. Sebagai contoh, Anda dapat menggunakan konektor MySQL CDC dari Flink yang sepenuhnya dikelola untuk menulis data log biner dari sistem manajemen database relasional (RDBMS) seperti MySQL ke tabel Hudi.

  • ETL inkremental

    Anda dapat menggunakan metode ekstraksi inkremental ETL untuk mendapatkan aliran data perubahan dari Hudi secara real-time. Metode ini memberikan performa real-time yang lebih baik dan lebih ringan dibandingkan dengan penjadwalan ETL offline. Sebagai contoh, Anda dapat mengekstrak data bisnis online secara inkremental ke sistem penyimpanan offline. Dalam skenario tipikal ini, mesin Flink menulis data yang diekstraksi ke tabel Hudi, dan kemudian mesin Apache Presto atau Apache Spark digunakan untuk melakukan pemrosesan analitik online (OLAP) secara efisien.

  • Antrian pesan

    Dalam skenario di mana Anda hanya perlu memproses sejumlah kecil data, Hudi juga dapat digunakan sebagai layanan antrian pesan untuk menggantikan Kafka. Ini menyederhanakan arsitektur pengembangan aplikasi.

  • Pengisian ulang data

    Jika Anda ingin memperbarui data penuh historis di baris dan kolom tertentu dari sebuah tabel, Anda dapat menggunakan danau data. Ini sangat mengurangi konsumsi sumber daya komputasi dan meningkatkan performa ujung ke ujung. Sebagai contoh, Anda dapat menggunakan fitur ini untuk membaca data penuh dan data inkremental dari tabel Hudi dalam metastore Hive dan menggabungkan kedua tabel untuk menghasilkan tabel lebar.

Keuntungan

Dibandingkan dengan komunitas Hudi open source, Hudi yang terintegrasi ke dalam Flink yang sepenuhnya dikelola memberikan lebih banyak keuntungan. Hudi yang terintegrasi ke dalam Flink yang sepenuhnya dikelola memberikan keuntungan berikut:

  • Tanpa pemeliharaan berdasarkan integrasi antara platform dan Flink yang sepenuhnya dikelola

    Flink yang sepenuhnya dikelola menyediakan konektor Hudi bawaan. Ini mengurangi kompleksitas O&M dan memberikan jaminan service level agreement (SLA).

  • Peningkatan konektivitas data

    Konektor Hudi terhubung ke beberapa mesin komputasi dan analitik big data Alibaba Cloud. Dengan cara ini, data dipisahkan dari mesin komputasi dan dapat bermigrasi dengan mulus di antara Apache Flink, Apache Spark, Apache Presto, dan Apache Hive.

  • Pengoptimalan pemasukan data dari database ke danau data

    Konektor Hudi bekerja sama dengan konektor Flink CDC untuk menyederhanakan pengembangan data.

  • Fitur tingkat perusahaan

    Fitur tingkat perusahaan didukung, seperti tampilan metadata terpadu dari Data Lake Formation (DLF) dan perubahan skema tabel otomatis dan ringan.

  • Penyimpanan hemat biaya dan skalabilitas tinggi menggunakan Alibaba Cloud OSS

    Data disimpan dalam format Apache Parquet atau Apache Avro di Alibaba Cloud OSS. Penyimpanan dan komputasi diisolasi dan sumber daya dapat diskalakan secara fleksibel.

Batasan

  • Hanya Realtime Compute for Apache Flink dengan versi engine vvr-4.0.11-flink-1.13 atau lebih baru yang mendukung konektor Hudi.

  • Hanya HDFS, Alibaba Cloud OSS, atau OSS-HDFS yang dapat digunakan sebagai sistem file.

  • Anda tidak dapat mempublikasikan draf dalam cluster sesi.

  • Anda tidak dapat menggunakan konektor Hudi untuk memodifikasi bidang dalam tabel. Jika Anda ingin memodifikasi bidang dalam tabel, gunakan pernyataan SQL Spark di konsol DLF.

Sintaksis

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  ...
);

Parameter dalam klausa WITH

Parameter dasar

  • Parameter Umum

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    STRING

    Ya

    Tidak ada nilai default

    Setel nilai menjadi hudi.

    path

    Path penyimpanan tabel.

    STRING

    Ya

    Tidak ada nilai default

    Tabel dapat disimpan di Bucket OSS, HDFS, atau OSS-HDFS.

    • OSS: Path dalam format oss://<bucket>/<user-defined-dir>.

    • HDFS: Path dalam format hdfs://<user-defined-dir>.

    • OSS-HDFS: Path dalam format oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>.

      Catatan

      Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.3 atau lebih baru yang memungkinkan Anda menyetel parameter ini ke path OSS-HDFS.

    Parameter dalam path:

    • bucket: nama Bucket OSS yang Anda buat.

    • user-defined-dir: path tempat data disimpan.

    • oss-hdfs-endpoint: endpoint layanan OSS-HDFS.

      Anda dapat melihat endpoint dari HDFS di bagian Port halaman Overview untuk Bucket OSS di konsol OSS.

    hoodie.datasource.write.recordkey.field

    Bidang kunci utama.

    STRING

    Tidak

    uuid

    • Anda dapat menggunakan sintaksis PRIMARY KEY untuk menentukan bidang kunci utama.

    • Pisahkan beberapa bidang dengan koma (,).

    precombine.field

    Bidang versi.

    STRING

    Tidak

    ts

    Bidang ini digunakan untuk menentukan apakah pesan perlu diperbarui.

    Jika Anda tidak mengonfigurasi parameter ini, sistem memperbarui data berdasarkan urutan pesan yang didefinisikan di mesin.

    oss.endpoint

    Endpoint Alibaba Cloud OSS atau OSS-HDFS.

    STRING

    Tidak

    Tidak ada nilai default

    Jika Anda menyimpan tabel di OSS atau OSS-HDFS, Anda harus menentukan parameter ini.

    • Jika Anda menyimpan tabel di OSS, setel parameter ini ke endpoint OSS. Untuk informasi lebih lanjut tentang endpoint OSS, lihat Wilayah dan endpoint.

    • Jika Anda menyimpan tabel di OSS-HDFS, setel parameter ini ke endpoint OSS-HDFS. Anda dapat melihat endpoint dari HDFS di bagian Port halaman Overview untuk Bucket OSS di konsol OSS.

    accessKeyId

    ID AccessKey akun Alibaba Cloud Anda.

    STRING

    Tidak

    Tidak ada nilai default

    Jika Anda menyimpan tabel di OSS atau OSS-HDFS, Anda harus menentukan parameter ini.

    Untuk informasi lebih lanjut, lihat Operasi konsol.

    Penting

    Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi ID AccessKey dan Rahasia AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel.

    accessKeySecret

    Rahasia AccessKey akun Alibaba Cloud Anda.

    STRING

    Tidak

    Tidak ada nilai default

  • Parameter Eksklusif untuk Tabel Sumber

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    read.streaming.enabled

    Menentukan apakah pembacaan streaming diaktifkan.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: Pembacaan streaming diaktifkan.

    • false: Pembacaan streaming dinonaktifkan.

    read.start-commit

    Offset dari mana konsumsi data dimulai.

    STRING

    Tidak

    Dibiarkan kosong

    Nilai valid:

    • Waktu dalam format yyyyMMddHHmmss: Data dikonsumsi dari waktu yang ditentukan.

    • earliest: Data dikonsumsi dari offset paling awal.

    • Jika parameter ini dibiarkan kosong, data dikonsumsi dari waktu terbaru.

  • Parameter Eksklusif untuk Tabel Sink

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    write.operation

    Mode operasi penulisan dilakukan.

    STRING

    Tidak

    UPSERT

    Nilai valid:

    • insert: Data ditulis ke tabel dalam mode append.

    • upsert: Data diperbarui.

    • bulk_insert: Data batch ditulis ke tabel dalam mode append.

    hive_sync.enable

    Menentukan apakah sinkronisasi metadata ke Hive diaktifkan.

    BOOLEAN

    Tidak

    false

    Nilai valid:

    • true: Sinkronisasi metadata ke Hive diaktifkan.

    • false: Sinkronisasi metadata ke Hive dinonaktifkan.

    hive_sync.mode

    Mode sinkronisasi data Hive.

    STRING

    Tidak

    hms

    Nilai valid:

    • hms: Jika Anda ingin menyinkronkan metadata ke metastore Hive atau DLF, setel parameter ini ke hms.

    • jdbc: Jika Anda ingin menyinkronkan metadata ke driver Java Database Connectivity (JDBC), setel parameter ini ke jdbc.

    hive_sync.db

    Nama database Hive tempat data disinkronkan.

    STRING

    Tidak

    default

    Tidak tersedia

    hive_sync.table

    Nama tabel Hive tempat data disinkronkan.

    STRING

    Tidak

    Nama tabel saat ini

    Nama tabel Hive tempat data disinkronkan dari Hudi tidak boleh mengandung tanda hubung (-).

    dlf.catalog.region

    Nama wilayah tempat layanan DLF diaktifkan.

    STRING

    Tidak

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Wilayah dan endpoint yang didukung.

    Catatan
    • Parameter dlf.catalog.region berlaku hanya jika parameter hive_sync.mode diatur ke hms.

    • Pastikan nilai parameter ini sesuai dengan endpoint yang ditentukan oleh parameter dlf.catalog.endpoint.

    dlf.catalog.endpoint

    Endpoint DLF.

    STRING

    Tidak

    Tidak ada nilai default

    Untuk informasi lebih lanjut, lihat Wilayah dan endpoint yang didukung.

    Catatan
    • Parameter dlf.catalog.endpoint berlaku hanya jika parameter hive_sync.mode diatur ke hms.

    • Kami sarankan Anda menyetel parameter dlf.catalog.endpoint ke endpoint virtual private cloud (VPC) dari DLF. Sebagai contoh, jika Anda memilih wilayah China (Hangzhou), setel parameter ini ke dlf-vpc.cn-hangzhou.aliyuncs.com.

    • Jika Anda ingin mengakses DLF lintas VPC, ikuti petunjuk yang dijelaskan dalam Bagaimana cara Realtime Compute for Apache Flink mengakses layanan lintas VPC?

Parameter lanjutan

Hudi mendukung berbagai skenario pembacaan dan penulisan. Tabel-tabel berikut menjelaskan parameter yang dapat dikonfigurasi dalam skenario berbeda.

Parameter untuk paralelisme

Parameter

Deskripsi

Nilai default

Catatan

write.tasks

Paralelisme tugas penulisan. Setiap tugas penulisan menulis data ke 1 hingga N bucket secara berurutan.

4

Jika Anda meningkatkan paralelisme tugas penulisan, jumlah file kecil tetap tidak berubah.

write.bucket_assign.tasks

Paralelisme operator pemberi tugas bucket.

Paralelisme penyebaran Realtime Compute for Apache Flink

Jika Anda meningkatkan paralelisme tugas penulisan dan paralelisme operator pemberi tugas bucket, jumlah file kecil bertambah.

write.index_bootstrap.tasks

Paralelisme operator bootstrap indeks.

Paralelisme penyebaran Realtime Compute for Apache Flink

  • Parameter ini berlaku hanya jika parameter index.bootstrap.enabled diatur ke true.

  • Jika Anda meningkatkan paralelisme, efisiensi tahap bootstrap dapat ditingkatkan. Checkpointing mungkin terblokir pada tahap bootstrap. Untuk menyelesaikan masalah ini, Anda dapat menyetel jumlah toleransi kegagalan checkpoint menjadi nilai besar.

read.tasks

Paralelisme operator baca streaming dan batch.

4

Tidak tersedia

compaction.tasks

Paralelisme operator kompaksi online.

4

Kompaksi online mengonsumsi lebih banyak sumber daya daripada kompaksi offline. Kami sarankan Anda melakukan kompaksi offline.

Parameter untuk kompaksi online

Parameter

Deskripsi

Nilai default

Catatan

compaction.schedule.enabled

Menentukan apakah akan menghasilkan rencana kompaksi sesuai jadwal.

true

Nilai valid:

  • true: Rencana kompaksi dihasilkan sesuai jadwal.

  • false: Rencana kompaksi tidak dihasilkan sesuai jadwal.

Catatan

Kami sarankan Anda menyetel parameter ini ke true meskipun parameter compaction.async.enabled diatur ke false.

compaction.async.enabled

Menentukan apakah kompaksi asinkron diaktifkan.

true

Nilai valid:

  • true: Kompaksi asinkron diaktifkan.

  • false: Kompaksi asinkron dinonaktifkan.

Catatan

Anda dapat menyetel parameter ini ke false untuk menonaktifkan kompaksi online. Namun, kami sarankan Anda menyetel parameter compaction.schedule.enabled ke true. Dalam hal ini, Anda dapat melakukan kompaksi asinkron offline untuk menjalankan rencana kompaksi yang dihasilkan sesuai jadwal.

compaction.tasks

Paralelisme tugas kompaksi.

4

Tidak tersedia

compaction.trigger.strategy

Strategi yang digunakan untuk memicu kompaksi.

num_commits

Nilai valid:

  • num_commits: Kompaksi dipicu ketika jumlah commit mencapai nilai yang ditentukan.

  • time_elapsed: Kompaksi dipicu pada interval tertentu.

  • num_and_time: Kompaksi dipicu ketika baik jumlah commit maupun interval mencapai nilai yang ditentukan.

  • num_or_time: Kompaksi dipicu ketika jumlah commit atau interval mencapai nilai yang ditentukan.

compaction.delta_commits

Jumlah maksimum commit yang diperlukan untuk memicu kompaksi.

5

Tidak tersedia

compaction.delta_seconds

Interval pemantik kompaksi.

3600

Satuan: detik.

compaction.max_memory

Memori maksimum dari hash map yang digunakan untuk kompaksi dan deduplikasi.

100 MB

Jika sumber daya cukup, kami sarankan Anda mengubah nilai parameter ini menjadi 1 GB.

compaction.target_io

Throughput I/O maksimum untuk setiap rencana kompaksi.

500 GB

Tidak tersedia

Parameter terkait file

Parameter terkait file digunakan untuk mengelola ukuran file. Tabel berikut menjelaskan parameter yang didukung.

Parameter

Deskripsi

Nilai default

Catatan

hoodie.parquet.max.file.size

Ukuran maksimum file Parquet tempat data dapat ditulis.

Jika data yang akan ditulis ke file Parquet melebihi ukuran yang ditentukan oleh parameter ini, data tambahan ditulis ke grup file baru.

120 * 1024 * 1024 byte

(120 MB)

Satuan: byte.

hoodie.parquet.small.file.limit

Ambang batas ukuran file kecil. File yang ukurannya kurang dari nilai parameter ini dianggap sebagai file kecil.

104857600 byte (100 MB)

  • Satuan: byte.

  • Selama penulisan data, Hudi mencoba menulis data ke file kecil yang ada dalam mode append alih-alih menulis data ke file baru.

hoodie.copyonwrite.record.size.estimate

Perkiraan ukuran rekaman.

1024 byte (1 KB)

  • Satuan: byte.

  • Jika Anda tidak menentukan parameter ini, Hudi secara dinamis menghitung ukuran rekaman berdasarkan metadata yang telah dicommit.

Parameter terkait Hadoop

Parameter

Deskripsi

Nilai default

Catatan

hadoop.${you option key}

Item konfigurasi Hadoop yang ditentukan menggunakan awalan hadoop.

Tidak ada nilai default

Anda dapat menentukan beberapa item konfigurasi Hadoop sekaligus.

Catatan

Parameter ini didukung di Hudi 0.12.0 dan versi lebih baru. Untuk memenuhi persyaratan commit lintas kluster dan eksekusi, Anda dapat menggunakan pernyataan DDL untuk menentukan konfigurasi Hadoop per-job.

Parameter untuk penulisan data

Hudi mendukung berbagai metode penulisan, termasuk penulisan batch dan penulisan streaming. Hudi mendukung berbagai jenis data, termasuk data changelog dan data log. Hudi juga mendukung skema pengindeksan yang berbeda.

  • Parameter untuk Penulisan Batch

    Jika Anda ingin mengimpor data yang ada dari sumber data lain ke Hudi, Anda dapat menggunakan fitur impor batch untuk mengimpor data yang ada ke tabel Hudi.

    Parameter

    Deskripsi

    Nilai default

    Catatan

    write.operation

    Jenis operasi penulisan.

    upsert

    Nilai valid:

    • upsert: Data dimasukkan dan diperbarui.

    • insert: Data dimasukkan.

    • bulk_insert: Data ditulis dalam batch.

      Catatan
      • Jika Anda menyetel parameter ini ke bulk_insert, serialisasi data berbasis Avro dan kompaksi data tidak dilakukan. Deduplikasi tidak dilakukan setelah data diimpor. Jika Anda memiliki persyaratan tinggi pada keunikan data, pastikan data yang diimpor unik.

      • Anda dapat menyetel parameter write.operation ke bulk_insert hanya dalam mode eksekusi batch. Dalam mode ini, sistem mengurutkan data input berdasarkan nama partisi dan menulis data ke tabel Hudi secara default. Dengan cara ini, operasi penulisan tidak sering beralih di antara file yang berbeda dan performa sistem tidak menurun.

    write.tasks

    Paralelisme tugas bulk_insert tulis.

    Paralelisme penyebaran Realtime Compute for Apache Flink

    Paralelisme tugas bulk_insert tulis ditentukan oleh parameter write.tasks dan berdampak pada jumlah file kecil.

    Secara teori, paralelisme tugas bulk_insert tulis sama dengan jumlah bucket. Jika data yang ditulis ke file di setiap bucket mencapai ukuran maksimum file, data bergulir ke handle file baru. Oleh karena itu, jumlah file akhir lebih besar atau sama dengan paralelisme tugas bulk_insert tulis. Ukuran maksimum file Parquet adalah 120 MB.

    write.bulk_insert.shuffle_input

    Menentukan apakah akan mengacak input data berdasarkan bidang partisi untuk tugas insert batch.

    true

    Di Hudi 0.11.0 dan versi lebih baru, Anda dapat menyetel parameter ini ke true untuk mengurangi jumlah file kecil. Namun, ini dapat menyebabkan skew data.

    write.bulk_insert.sort_input

    Menentukan apakah akan mengurutkan input data berdasarkan bidang partisi untuk tugas insert batch.

    true

    Di Hudi 0.11.0 dan versi lebih baru, jika Anda ingin menjalankan tugas penulisan untuk menulis data ke beberapa partisi, Anda dapat menyetel parameter ini ke true untuk mengurangi jumlah file kecil.

    write.sort.memory

    Memori terkelola yang tersedia untuk operator sortir.

    128

    Satuan: MB.

  • Parameter untuk Mengaktifkan Mode Changelog

    Dalam mode changelog, Hudi mempertahankan semua perubahan (INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE) pesan dan kemudian bekerja sama dengan komputasi stateful dari mesin Flink untuk mengimplementasikan gudang data near-real-time ujung ke ujung. Tabel Merge on Read (MOR) Hudi mendukung penyimpanan semua perubahan pesan berdasarkan penyimpanan berorientasi baris. Flink yang sepenuhnya dikelola dapat membaca tabel MOR dalam mode streaming untuk mengonsumsi semua catatan perubahan.

    Catatan

    Dalam mode non-changelog, perubahan perantara dalam dataset batch dalam pembacaan streaming tunggal dapat digabungkan. Semua hasil langsung digabungkan dalam pembacaan batch (snapshot read). Status perantara diabaikan terlepas dari apakah itu ditulis.

    Parameter

    Deskripsi

    Nilai default

    Catatan

    changelog.enabled

    Menentukan apakah akan mengonsumsi semua perubahan.

    false

    Nilai valid:

    • true: Semua perubahan dapat dikonsumsi.

    • false: Tidak semua perubahan dapat dikonsumsi. Semantik UPSERT digunakan. Hanya pesan tergabung terakhir di antara semua pesan yang dijamin, dan perubahan perantara mungkin digabungkan.

    Catatan

    Setelah Anda menyetel parameter changelog.enabled ke true, tugas kompaksi asinkron masih menggabungkan perubahan perantara menjadi satu rekaman data. Jika data tidak dikonsumsi pada kesempatan paling awal selama pembacaan streaming, hanya rekaman data terakhir yang dapat dibaca setelah data dikompaksi. Anda dapat mengubah waktu buffer untuk kompaksi data untuk menyediakan waktu tertentu bagi pembaca untuk membaca dan mengonsumsi data. Sebagai contoh, Anda dapat menyetel parameter compaction.delta_commits ke 5 dan parameter compaction.delta_seconds ke 3600.

  • Mode Append (didukung di Hudi 0.10.0 dan versi lebih baru)

    Dalam Mode Append:

    • Kebijakan file kecil diterapkan pada tabel MOR. Data ditulis ke file log Avro dalam mode append.

    • Kebijakan file kecil tidak diterapkan pada tabel COW. File Parquet baru dihasilkan setiap kali data ditulis ke tabel Copy on Write (COW).

Parameter untuk kebijakan pengelompokan

Hudi mendukung berbagai kebijakan pengelompokan untuk menyelesaikan masalah file kecil dalam mode INSERT.

  • Parameter untuk Pengelompokan Inline (hanya didukung untuk tabel COW)

    Parameter

    Deskripsi

    Nilai default

    Catatan

    write.insert.cluster

    Menentukan apakah akan menggabungkan file kecil selama penulisan data.

    false

    Nilai valid:

    • true: File kecil digabungkan selama penulisan data.

    • false: File kecil tidak digabungkan selama penulisan data.

    Catatan

    Secara default, file kecil tidak digabungkan ketika operasi INSERT dilakukan pada tabel COW. Jika Anda menyetel parameter ini ke true, file kecil yang ada digabungkan setiap kali operasi INSERT dilakukan. Namun, deduplikasi tidak dilakukan. Akibatnya, throughput penulisan berkurang.

  • Parameter untuk Pengelompokan Asinkron (didukung di Hudi 0.12.0 dan versi lebih baru)

    Parameter

    Deskripsi

    Nilai default

    Catatan

    clustering.schedule.enabled

    Menentukan apakah akan menjadwalkan rencana pengelompokan selama penulisan data.

    false

    Jika Anda menyetel parameter ini ke true, rencana pengelompokan dijadwalkan secara berkala.

    clustering.delta_commits

    Jumlah commit yang diperlukan untuk menghasilkan rencana pengelompokan.

    4

    Jika parameter clustering.schedule.enabled disetel ke true, parameter ini berlaku.

    clustering.async.enabled

    Menentukan apakah akan menjalankan rencana pengelompokan secara asinkron.

    false

    Jika Anda menyetel parameter ini ke true, rencana pengelompokan dijalankan secara asinkron pada interval reguler untuk menggabungkan file kecil.

    clustering.tasks

    Paralelisme tugas pengelompokan.

    4

    Tidak tersedia

    clustering.plan.strategy.target.file.max.bytes

    Ukuran maksimum file untuk pengelompokan.

    1024 * 1024 * 1024

    Satuan: byte.

    clustering.plan.strategy.small.file.limit

    Ambang batas ukuran file kecil yang digunakan untuk pengelompokan.

    600

    Hanya file yang ukurannya kurang dari nilai parameter ini yang dapat digunakan untuk pengelompokan.

    clustering.plan.strategy.sort.columns

    Kolom berdasarkan mana data diurutkan untuk pengelompokan.

    Tidak ada nilai default

    Anda dapat menentukan bidang pengurutan khusus.

  • Parameter untuk Strategi Rencana Pengelompokan

    Parameter

    Deskripsi

    Nilai default

    Catatan

    clustering.plan.partition.filter.mode

    Mode filter partisi yang digunakan dalam pembuatan rencana pengelompokan.

    NONE

    Nilai valid:

    • NONE: Partisi tidak difilter. Semua partisi dipilih untuk pengelompokan.

    • RECENT_DAYS: Jika data dipartisi berdasarkan hari, partisi yang sesuai dengan jumlah hari terbaru yang ditentukan dipilih untuk pengelompokan.

    • SELECTED_PARTITIONS: Partisi yang ditentukan dipilih untuk pengelompokan.

    clustering.plan.strategy.daybased.lookback.partitions

    Jumlah hari terbaru berdasarkan mana partisi dipilih untuk pengelompokan ketika parameter clustering.plan.partition.filter.mode disetel ke RECENT_DAYS.

    2

    Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke RECENT_DAYS.

    clustering.plan.strategy.cluster.begin.partition

    Partisi awal, yang digunakan untuk memfilter partisi.

    Tidak ada nilai default

    Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke SELECTED_PARTITIONS.

    clustering.plan.strategy.cluster.end.partition

    Partisi akhir, yang digunakan untuk memfilter partisi.

    Tidak ada nilai default

    Parameter ini berlaku hanya jika parameter clustering.plan.partition.filter.mode disetel ke SELECTED_PARTITIONS.

    clustering.plan.strategy.partition.regex.pattern

    Ekspresi reguler yang digunakan untuk menentukan partisi.

    Tidak ada nilai default

    Tidak tersedia

    clustering.plan.strategy.partition.selected

    Partisi yang dipilih.

    Tidak ada nilai default

    Pisahkan beberapa partisi dengan koma (,).

  • Parameter Terkait Jenis Indeks Bucket

    Catatan

    Parameter dalam tabel berikut didukung di Hudi 0.11.0 dan versi lebih baru.

    Parameter

    Deskripsi

    Nilai default

    Catatan

    index.type

    Jenis indeks.

    FLINK_STATE

    Nilai valid:

    • FLINK_STATE: Jenis indeks state Flink digunakan.

    • BUCKET: Jenis indeks bucket digunakan.

      Jika Anda mengonfigurasi index.type=bucket, menyetel parameter index.global.enabled ke true tidak valid karena jenis indeks bucket tidak mendukung perubahan lintas partisi. Dalam hal ini, deduplikasi pada beberapa partisi tidak dapat dilakukan meskipun fitur indeks global diaktifkan.

    Jika jumlah data besar seperti lebih dari 500 juta rekaman data dalam sebuah tabel, overhead penyimpanan state Flink dapat menjadi hambatan. Jenis indeks bucket menggunakan kebijakan hash tetap untuk mengalokasikan data yang mengandung kunci yang sama ke grup file yang sama. Ini membantu mencegah overhead penyimpanan dan query indeks. Jenis indeks bucket dan jenis indeks state Flink memiliki perbedaan berikut:

    • Dibandingkan dengan jenis indeks state Flink, jenis indeks bucket tidak memiliki overhead penyimpanan dan komputasi. Jenis indeks bucket memberikan performa lebih baik daripada jenis indeks state Flink.

    • Jika Anda menggunakan jenis indeks bucket, Anda tidak dapat meningkatkan jumlah bucket. Jika Anda menggunakan jenis indeks state Flink, Anda dapat secara dinamis meningkatkan jumlah file berdasarkan ukuran file.

    • Jenis indeks bucket tidak mendukung perubahan lintas partisi. Setelah data ditulis ke partisi, indeks bucket tidak berubah. Sebagai contoh, operasi penghapusan hanya memengaruhi data di partisi saat ini dan tidak memengaruhi data yang sama di partisi lain.

      Catatan

      Jika data input adalah data streaming CDC, batasan ini tidak berlaku.

    hoodie.bucket.index.hash.field

    Bidang kunci hash ketika jenis indeks bucket digunakan.

    Kunci utama

    Parameter ini dapat disetel ke subset dari kunci utama.

    hoodie.bucket.index.num.buckets

    Jumlah bucket ketika jenis indeks bucket digunakan.

    4

    Secara default, nilai parameter ini adalah jumlah bucket di setiap partisi. Anda tidak dapat mengubah nilai parameter ini setelah konfigurasi.

Parameter untuk pembacaan data

  • Hudi mendukung berbagai metode pembacaan, termasuk pembacaan batch, pembacaan streaming, dan pembacaan data inkremental. Hudi juga dapat mengonsumsi dan mentransfer changelog untuk mengimplementasikan ETL inkremental ujung ke ujung.

    • Parameter untuk Pembacaan Streaming

      Secara default, pembacaan snapshot digunakan untuk tabel. Dalam hal ini, snapshot penuh terbaru dibaca dan dikembalikan sekaligus. Anda dapat menyetel parameter read.streaming.enabled ke true untuk mengaktifkan pembacaan streaming. Anda dapat mengonfigurasi parameter read.start-commit untuk menentukan offset awal untuk pembacaan streaming. Anda dapat menyetel parameter read.start-commit ke earliest untuk memungkinkan data dikonsumsi dari offset paling awal.

      Parameter

      Deskripsi

      Nilai default

      Catatan

      read.streaming.enabled

      Menentukan apakah pembacaan streaming diaktifkan.

      false

      Nilai valid:

      • true: Pembacaan streaming diaktifkan.

      • false: Pembacaan streaming dinonaktifkan.

      read.start-commit

      Offset awal untuk pembacaan streaming.

      Dibiarkan kosong

      Nilai valid:

      • Waktu dalam format yyyyMMddHHmmss: Data dikonsumsi dari waktu yang ditentukan.

      • earliest: Data dikonsumsi dari offset paling awal.

      • Jika parameter ini dibiarkan kosong, data dikonsumsi dari waktu terbaru.

      clean.retain_commits

      Jumlah maksimum commit historis yang dapat dipertahankan oleh cleaner.

      30

      Jika jumlah commit historis melebihi nilai parameter ini, commit historis yang berlebih dihapus. Dalam mode changelog, parameter ini dapat digunakan untuk mengontrol periode retensi changelog. Sebagai contoh, jika periode checkpointing adalah 5 menit, changelog dipertahankan setidaknya selama 150 menit secara default.

      Penting
      • Pembacaan streaming changelog didukung hanya di Hudi 0.10.0 dan versi lebih baru. Dalam mode changelog, Hudi mempertahankan changelog selama periode tertentu untuk konsumen hilir mengonsumsinya.

      • Changelog mungkin digabungkan dalam tugas kompaksi. Dalam hal ini, catatan perantara dihapus, yang dapat memengaruhi hasil perhitungan.

    • Parameter untuk Pembacaan Data Inkremental (didukung di Hudi 0.10.0 dan versi lebih baru)

      Flink yang sepenuhnya dikelola mendukung konsumsi inkremental menggunakan konektor DataStream, konsumsi batch inkremental, dan konsumsi batch data pada titik waktu tertentu menggunakan fitur time travel.

      Parameter

      Deskripsi

      Nilai default

      Catatan

      read.start-commit

      Offset dari mana konsumsi data dimulai.

      Commit dari offset terbaru

      Nilai parameter ini dalam format yyyyMMddHHmmss.

      Intervalnya adalah interval tertutup, yang mencakup offset awal dan akhir.

      read.end-commit

      Offset di mana konsumsi data berakhir.

      Commit dari offset terbaru

Kode contoh

  • Kode Contoh untuk Tabel Sumber

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'      
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- Baca data dari commit terbaru dalam mode streaming dan tulis data ke Blackhole. 
INSERT INTO blackhole SELECT * from hudi_tbl;
  • Kode Contoh untuk Tabel Sink

CREATE TEMPORARY TABLE datagen(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data  STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen' ,
  'rows-per-second'='100' 
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * from datagen;

API DataStream

Penting

Jika Anda ingin memanggil API DataStream untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang cara mengonfigurasi konektor DataStream, lihat Pengaturan Konektor DataStream.

  • Maven POM

    Tentukan versi Realtime Compute for Apache Flink dan Hudi berdasarkan versi VVR yang digunakan.

    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.15.4</flink.version>
      <hudi.version>0.13.1</hudi.version>
    </properties>
    
    <dependencies>
      <!-- flink -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- hudi -->
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink1.15-bundle</artifactId>
        <version>${hudi.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- oss -->
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- dlf -->
      <dependency>
        <groupId>com.aliyun.datalake</groupId>
        <artifactId>metastore-client-hive2</artifactId>
        <version>0.2.14</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.5.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    Penting

    Dependensi spesifik yang digunakan oleh DLF bertentangan dengan versi Hive open source, seperti hive-common dan hive-exec. Jika Anda ingin menguji DLF di lingkungan lokal Anda, Anda dapat mengunduh paket JAR hive-common dan hive-exec, dan secara manual mengimpor paket tersebut di IntelliJ IDEA.

  • Tulis Data ke Hudi

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.data.StringData;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.util.HoodiePipeline;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class FlinkHudiQuickStart {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String dbName = "test_db";
        String tableName = "test_tbl";
        String basePath = "oss://xxx";
    
        Map<String, String> options = new HashMap<>();
        // konfigurasi hudi
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
        options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
        options.put(FlinkOptions.TABLE_NAME.key(), tableName);
        // konfigurasi oss
        options.put("hadoop.fs.oss.accessKeyId", "xxx");
        options.put("hadoop.fs.oss.accessKeySecret", "xxx");
        // Gunakan endpoint publik untuk debugging lokal, seperti oss-cn-hangzhou.aliyuncs.com. Gunakan endpoint internal untuk pengiriman kluster, seperti oss-cn-hangzhou-internal.aliyuncs.com.
        options.put("hadoop.fs.oss.endpoint", "xxx");
        options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
        options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
        // konfigurasi dlf
        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // Anda dapat menentukan apakah akan menyinkronkan data Hudi ke DLF.
        options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
        options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
        options.put("hadoop.dlf.catalog.id", "xxx");
        options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
        options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
        options.put("hadoop.dlf.catalog.region", "xxx");
        // Gunakan endpoint publik untuk debugging lokal, seperti dlf.cn-hangzhou.aliyuncs.com. Gunakan endpoint internal untuk pengiriman kluster, seperti dlf-vpc.cn-hangzhou.aliyuncs.com.
        options.put("hadoop.dlf.catalog.endpoint", "xxx");
        options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
    
        DataStream<RowData> dataStream = env.fromElements(
            GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
                StringData.fromString("1001"), StringData.fromString("p1")),
            GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
                StringData.fromString("1002"), StringData.fromString("p2"))
        );
    
        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
            .column("uuid string")
            .column("name string")
            .column("age int")
            .column("ts string")
            .column("`partition` string")
            .pk("uuid")
            .partition("partition")
            .options(options);
    
        builder.sink(dataStream, false); // Parameter kedua menunjukkan apakah aliran data input dibatasi
        env.execute("Flink_Hudi_Quick_Start");
      }
    }

FAQ