All Products
Search
Document Center

Realtime Compute for Apache Flink:Apache Iceberg

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara menggunakan konektor Iceberg.

Informasi latar belakang

Apache Iceberg adalah format tabel terbuka untuk data lake. Anda dapat menggunakan Apache Iceberg untuk membangun layanan penyimpanan data lake yang hemat biaya dan dapat diskalakan di Hadoop Distributed File System (HDFS) atau Object Storage Service (OSS). Data tersebut kemudian dapat dianalisis menggunakan mesin komputasi dari ekosistem big data open source, seperti Flink, Spark, Hive, dan Presto.

Kategori

Detail

Jenis yang didukung

Tabel sumber, tabel sink, dan tujuan ingesti data

Mode runtime

Mode batch dan mode stream

Format data

Tidak berlaku

Metrik pemantauan khusus

Tidak ada

Jenis API

Pekerjaan SQL, YAML untuk ingesti data

Mendukung pembaruan atau penghapusan data di tabel sink

Ya

Fitur

Apache Iceberg menyediakan fitur-fitur inti berikut:

  • Layanan penyimpanan data lake yang ringan dan hemat biaya berbasis HDFS atau OSS.

  • Dukungan penuh terhadap semantik atomicity, consistency, isolation, dan durability (ACID).

  • Dukungan rollback ke versi historis.

  • Penyaringan data yang efisien.

  • Evolusi skema.

  • Evolusi partisi.

  • Kompatibilitas dengan Hive Metastore yang dikelola sendiri. Untuk informasi selengkapnya, lihat Gunakan Hive Catalog dengan Hive Metastore (HMS) yang dikelola sendiri.

Catatan

Anda dapat memanfaatkan kemampuan toleransi kesalahan dan pemrosesan aliran Flink untuk mengingesti volume besar data log dan data perilaku ke dalam data lake Apache Iceberg secara real time, lalu mengekstraksi nilai dari data tersebut menggunakan Flink atau mesin analitik lainnya.

Batasan

  • Konektor Iceberg hanya didukung pada mesin komputasi Flink Ververica Runtime (VVR) 4.0.8 dan versi yang lebih baru. Anda harus menggunakan konektor Iceberg bersama Katalog Data Lake Formation (DLF). Untuk informasi selengkapnya, lihat Kelola Katalog DLF-Legacy.

  • Konektor Iceberg mendukung format tabel Apache Iceberg v1 dan v2. Untuk informasi selengkapnya, lihat Spesifikasi Tabel Iceberg.

    Catatan

    Format tabel v2 hanya didukung pada mesin komputasi waktu nyata VVR 8.0.7 dan versi yang lebih baru.

  • Dalam mode pembacaan stream, hanya tabel Iceberg append-only yang didukung sebagai tabel sumber.

Sintaksis

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

Parameter WITH

Parameter umum (untuk tabel sumber)

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

connector

Jenis tabel sumber.

String

Ya

Tidak ada

Nilainya harus iceberg.

catalog-name

Nama katalog.

String

Ya

Tidak ada

Masukkan nama kustom dalam bahasa Inggris.

catalog-database

Nama database.

String

Ya

default

Nama database yang telah Anda buat di DLF, misalnya dlf_db.

Catatan

Jika Anda belum membuat database DLF, buatlah satu.

io-impl

Kelas implementasi sistem file terdistribusi.

String

Ya

Tidak ada

Nilainya harus org.apache.iceberg.aliyun.oss.OSSFileIO.

oss.endpoint

Titik akhir Alibaba Cloud Object Storage Service (OSS).

String

Tidak

Tidak ada

Untuk informasi selengkapnya, lihat Wilayah dan titik akhir.

Catatan
  • Kami menyarankan agar Anda mengatur parameter oss.endpoint ke titik akhir VPC OSS. Misalnya, jika Anda memilih wilayah Tiongkok (Hangzhou), atur oss.endpoint ke oss-cn-hangzhou-internal.aliyuncs.com.

  • Untuk mengakses OSS lintas VPC, lihat Bagaimana cara mengakses layanan lain lintas VPC?

  • access.key.id: VVR 8.0.6 dan versi sebelumnya

  • access-key-id: VVR 8.0.7 dan versi setelahnya

ID AccessKey akun Alibaba Cloud Anda.

String

Ya

Tidak ada

Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey?

Penting

Untuk mencegah kebocoran informasi AccessKey Anda, kami menyarankan agar Anda menggunakan variabel untuk nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek.

  • access.key.secret: VVR 8.0.6 dan versi sebelumnya

  • access-key-secret: VVR 8.0.7 dan versi setelahnya

Rahasia AccessKey akun Alibaba Cloud Anda.

String

Ya

Tidak ada

catalog-impl

Nama kelas katalog.

String

Ya

Tidak ada

Nilainya harus org.apache.iceberg.aliyun.dlf.DlfCatalog.

warehouse

Jalur di OSS tempat data tabel disimpan.

String

Ya

Tidak ada

Tidak ada.

dlf.catalog-id

ID akun Alibaba Cloud Anda.

String

Ya

Tidak ada

Anda dapat memperoleh ID akun dari halaman Informasi Pengguna.

dlf.endpoint

Titik akhir layanan DLF.

String

Ya

Tidak ada

.

Catatan
  • Kami menyarankan agar Anda mengatur parameter dlf.endpoint ke titik akhir VPC DLF. Misalnya, jika Anda memilih wilayah Tiongkok (Hangzhou), atur parameter dlf.endpoint ke dlf-vpc.cn-hangzhou.aliyuncs.com.

  • Untuk mengakses DLF lintas VPC, lihat Manajemen dan operasi ruang penyimpanan

dlf.region-id

Wilayah layanan DLF.

String

Ya

Tidak ada

.

Catatan

Wilayah ini harus sama dengan wilayah yang dipilih untuk dlf.endpoint.

uri

URI Thrift Hive metastore.

String

Wajib hanya saat Anda menggunakan Hive Catalog.

Tidak ada

Gunakan parameter ini dengan Hive Metastore yang dikelola sendiri.

Parameter khusus untuk tabel sink

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

write.operation

Mode operasi penulisan.

String

Tidak

upsert

  • upsert (default): memperbarui data.

  • insert: menambahkan data.

  • bulk_insert: melakukan penyisipan massal tanpa pembaruan.

hive_sync.enable

Menentukan apakah akan mengaktifkan sinkronisasi metadata ke Hive.

boolean

Tidak

false

Nilai yang valid:

  • true: mengaktifkan fitur.

  • false (default): menonaktifkan fitur.

hive_sync.mode

Mode sinkronisasi data Hive.

String

Tidak

hms

  • hms (default): Atur parameter ini ke hms saat Anda menggunakan Hive Metastore atau Katalog DLF.

  • jdbc: Atur parameter ini ke jdbc saat Anda menggunakan Katalog JDBC.

hive_sync.db

Nama database Hive tempat Anda ingin menyinkronkan metadata.

String

Tidak

Nama database tempat tabel saat ini berada di katalog.

Tidak ada.

hive_sync.table

Nama tabel Hive tempat Anda ingin menyinkronkan metadata.

String

Tidak

Nama tabel saat ini.

Tidak ada.

dlf.catalog.region

Wilayah layanan DLF.

String

Tidak

Tidak ada

.

Catatan
  • Parameter dlf.catalog.region hanya berlaku saat hive_sync.mode diatur ke hms.

  • Wilayah ini harus sama dengan wilayah yang dipilih untuk dlf.catalog.endpoint.

dlf.catalog.endpoint

Titik akhir layanan DLF.

String

Tidak

Tidak ada

.

Catatan
  • Parameter dlf.catalog.endpoint hanya berlaku saat hive_sync.mode diatur ke hms.

  • Kami menyarankan agar Anda mengatur parameter dlf.catalog.endpoint ke titik akhir VPC DLF. Misalnya, jika Anda memilih wilayah Tiongkok (Hangzhou), atur parameter dlf.catalog.endpoint ke dlf-vpc.cn-hangzhou.aliyuncs.com.

  • Untuk mengakses DLF lintas VPC, lihat Manajemen dan operasi ruang penyimpanan

Pemetaan tipe

Tipe bidang Iceberg

Tipe bidang Flink

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

Catatan

Presisi timestamp Iceberg adalah mikrodetik, sedangkan presisi timestamp Flink adalah milidetik. Saat Anda menggunakan Flink untuk membaca data dari Iceberg, presisi waktu disejajarkan ke milidetik.

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

Contoh kode

Pastikan Anda telah membuat bucket OSS dan database DLF. Untuk informasi selengkapnya, lihat Buat bucket di konsol dan Database, tabel, dan fungsi.

Catatan

Saat membuat database DLF dan mengatur Path, disarankan menggunakan format ${warehouse}/${database_name}.db. Misalnya, jika alamat warehouse adalah oss://iceberg-test/warehouse dan nama database adalah dlf_db, atur jalur OSS untuk dlf_db ke oss://iceberg-test/warehouse/dlf_db.db.

Contoh tabel sink

Contoh ini menunjukkan cara menggunakan konektor Datagen untuk menghasilkan data streaming secara acak dan menulisnya ke tabel Iceberg.

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Contoh tabel sumber

  • Gunakan Hive Catalog dengan Hive Metastore (HMS) yang dikelola sendiri.

    Pastikan kluster Flink dapat berkomunikasi dengan kluster HMS melalui jaringan. Data disimpan di direktori oss://<bucket>/<path>/<database-name>/flink_table.

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<port>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<yourAccessKeyId>',
      'access-key-secret'='<yourAccessKeySecret>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • Gunakan Katalog DLF untuk menulis data dari tabel sumber Iceberg ke tabel sink Iceberg.

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

Ingesti Data

Anda dapat menggunakan konektor Iceberg sebagai sink untuk menulis data dalam pekerjaan YAML guna mengingesti data.

Sintaksis

sink:
  type: iceberg
  name: Iceberg Sink
  catalog.properties.rest.signing-region: cn-beijing
  catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
  catalog.properties.warehouse: flink_iceberg
  catalog.properties.type: rest
  catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

Item konfigurasi

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

type

Jenis konektor.

Ya

STRING

Tidak ada

Nilainya harus iceberg.

name

Nama sink.

Tidak

STRING

Tidak ada

Nama sink.

catalog.properties.rest.signing-region

ID wilayah DLF. Untuk informasi selengkapnya, lihat Titik akhir.

Ya

STRING

Tidak ada

Tidak ada

catalog.properties.uri

URI yang digunakan untuk mengakses Katalog REST DLF. Untuk informasi selengkapnya, lihat Iceberg REST.

Ya

STRING

Tidak ada

Tidak ada

catalog.properties.warehouse

Nama Katalog DLF.

Ya

STRING

Tidak ada

Tidak ada

catalog.properties.warehouse

Direktori root untuk penyimpanan file.

Tidak

STRING

Tidak ada

Tidak ada

catalog.properties.type

Jenis katalog. Nilainya harus rest.

Ya

STRING

rest

Tidak ada

catalog.properties.io-impl

Nilainya harus org.apache.iceberg.rest.DlfFileIO.

Ya

STRING

org.apache.iceberg.rest.DlfFileIO

Tidak ada

partition.key

Bidang partisi untuk setiap tabel partisi.

Tidak

STRING

Tidak ada

Kunci partisi untuk setiap tabel partisi. Anda dapat mengatur beberapa kunci primer untuk beberapa tabel. Gunakan titik koma (;) untuk memisahkan tabel dan koma (,) untuk memisahkan kunci partisi. Misalnya, Anda dapat menggunakan testdb.table1:id1,id2;testdb.table2:name untuk mengatur bidang partisi untuk tabel testdb.table1 ke id1 dan bidang partisi untuk tabel testdb.table2 ke name.

Untuk partisi yang memerlukan transformasi implisit, Anda dapat menambahkan fungsi transformasi implisit langsung ke bidang partisi. Contoh: testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time).

table.properties.*

Parameter untuk membuat tabel Iceberg.

Tidak

String

Tidak ada

Untuk informasi selengkapnya, lihat Opsi tabel Iceberg.

Contoh penggunaan

Kode berikut memberikan contoh konfigurasi penulisan data ke Data Lake Formation Alibaba Cloud saat Katalog Iceberg merupakan Katalog DLF:

  • source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: iceberg
      name: Iceberg Sink
      catalog.properties.rest.signing-region: cn-beijing
      catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
      catalog.properties.warehouse: flink_iceberg
      catalog.properties.type: rest
      catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

    Untuk informasi tentang parameter yang memiliki awalan catalog.properties, lihat Buat Katalog Iceberg DLF.

Evolusi skema

Saat ini, Iceberg sebagai sink untuk ingesti data mendukung event evolusi skema berikut:

  • CREATE TABLE EVENT

  • EVENT ADD COLUMN

  • EVENT ALTER COLUMN TYPE (Memodifikasi tipe kolom kunci primer tidak didukung)

  • EVENT RENAME COLUMN

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

Catatan

Jika tabel Iceberg downstream sudah ada, data ditulis berdasarkan skema tabel yang ada. Sistem tidak mencoba membuat ulang tabel tersebut.

Referensi

Untuk informasi tentang konektor yang didukung Flink, lihat Konektor yang Didukung.