全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Apache Iceberg

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan Konektor Apache Iceberg.

Informasi latar belakang

Apache Iceberg adalah format tabel terbuka untuk danau data. Anda dapat menggunakan Apache Iceberg untuk membangun layanan penyimpanan danau data berbasis HDFS atau Alibaba Cloud Object Storage Service (OSS). Selanjutnya, Anda dapat menggunakan mesin komputasi dari ekosistem big data open source seperti Apache Flink, Apache Spark, Apache Hive, atau Apache Presto untuk menganalisis data di danau data Anda. Tabel berikut memberikan gambaran umum tentang Konektor Apache Iceberg:

Item

Deskripsi

Tipe tabel

Tabel sumber dan tabel sink

Mode operasi

Mode batch dan mode streaming

Format data

Tidak tersedia

Metrik

Tidak tersedia

Tipe API

SQL API

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Fitur

Konektor Apache Iceberg menyediakan kemampuan inti berikut:

  • Membangun layanan penyimpanan danau data ringan berbiaya rendah berbasis HDFS atau OSS.

  • Menyediakan semantik ACID (atomicity, consistency, isolation, durability) yang komprehensif.

  • Mendukung pelacakan versi historis.

  • Mendukung penyaringan data yang efisien.

  • Mendukung evolusi skema.

  • Mendukung evolusi partisi.

  • Mendukung penyimpanan data dalam metastore Hive yang dikelola sendiri. Untuk informasi lebih lanjut, lihat Buat dan gunakan tabel sumber Apache Iceberg.

Catatan

Anda dapat menggunakan kemampuan toleransi kesalahan dan pemrosesan aliran yang kuat dari Flink untuk mengimpor sejumlah besar data perilaku log ke danau data Apache Iceberg secara real-time. Kemudian, Anda dapat menggunakan Flink atau mesin analitik lainnya untuk mengekstraksi nilai dari data Anda.

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 4.0.8 atau lebih baru yang mendukung Konektor Apache Iceberg. Konektor Apache Iceberg harus digunakan bersama dengan katalog Data Lake Formation (DLF). Untuk informasi lebih lanjut, lihat Kelola katalog DLF.

  • Konektor Apache Iceberg mendukung format tabel Apache Iceberg versi 1 dan versi 2. Untuk informasi lebih lanjut, lihat Spesifikasi Tabel Iceberg.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau lebih baru yang mendukung format tabel Apache Iceberg versi 2.

  • Jika mode pembacaan streaming diaktifkan, hanya tabel Apache Iceberg di mana data ditulis dalam mode Append Only yang dapat digunakan sebagai tabel sumber.

Sintaksis

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

Opsi konektor dalam klausa WITH

Opsi umum dan tabel sumber

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

connector

Konektor yang akan digunakan.

STRING

Ya

Tidak ada nilai default

Atur nilainya menjadi iceberg.

catalog-name

Nama katalog.

STRING

Ya

Tidak ada nilai default

Masukkan nama katalog Anda.

catalog-database

Nama database.

STRING

Ya

default

Atur nilainya menjadi nama database yang dibuat pada Data Lake Formation (DLF). Contoh: dlf_db.

Catatan

Jika Anda belum membuat database DLF, buat satu.

io-impl

Nama kelas implementasi dalam sistem file terdistribusi.

STRING

Ya

Tidak ada nilai default

Atur nilainya menjadi org.apache.iceberg.aliyun.oss.OSSFileIO.

oss.endpoint

Titik akhir bucket OSS Anda.

STRING

Tidak

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Wilayah dan titik akhir.

Catatan
  • Kami merekomendasikan Anda mengatur opsi oss.endpoint ke titik akhir virtual private cloud (VPC) dari bucket OSS. Sebagai contoh, jika Anda memilih wilayah China (Hangzhou), atur oss.endpoint menjadi oss-cn-hangzhou-internal.aliyuncs.com.

  • Jika Anda ingin mengakses OSS lintas VPC, ikuti petunjuk yang dijelaskan dalam Bagaimana fully managed Flink mengakses layanan lintas VPC?

  • access.key.id: VVR 8.0.6 atau lebih lama.

  • access-key-id: VVR 8.0.7 atau lebih baru.

ID AccessKey dari akun Alibaba Cloud Anda.

STRING

Ya

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Bagaimana cara melihat pasangan AccessKey dari sebuah akun?

Penting

Untuk meningkatkan keamanan kredensial Anda, hindari hardcoding pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi lebih lanjut, lihat Kelola kunci.

  • access.key.secret: VVR 8.0.6 atau lebih lama.

  • access-key-secret: VVR 8.0.7 atau lebih baru.

Rahasia AccessKey dari akun Alibaba Cloud Anda.

STRING

Ya

Tidak ada nilai default

catalog-impl

Nama kelas katalog.

STRING

Ya

Tidak ada nilai default

Atur nilainya menjadi org.apache.iceberg.aliyun.dlf.DlfCatalog.

warehouse

Direktori OSS tempat data tabel disimpan.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

dlf.catalog-id

ID akun Alibaba Cloud Anda.

STRING

Ya

Tidak ada nilai default

Anda dapat pergi ke halaman Pengaturan Keamanan untuk mendapatkan ID akun.

dlf.endpoint

Titik akhir DLF

STRING

Ya

Tidak ada nilai default

Catatan
  • Kami merekomendasikan Anda mengatur dlf.endpoint ke titik akhir VPC dari DLF. Sebagai contoh, jika Anda memilih wilayah China (Hangzhou), atur opsi dlf.endpoint menjadi dlf-vpc.cn-hangzhou.aliyuncs.com.

  • Jika Anda ingin mengakses DLF lintas VPC, ikuti petunjuk yang dijelaskan dalam FAQ tentang manajemen dan operasi ruang kerja serta namespace

dlf.region-id

Nama wilayah di mana layanan DLF diaktifkan.

STRING

Ya

Tidak ada nilai default

Catatan

Pastikan wilayah yang Anda pilih sesuai dengan titik akhir yang Anda pilih untuk dlf.endpoint.

uri

URI thrift untuk metastore Hive Anda.

STRING

Tidak

Tidak ada nilai default

Digunakan bersama dengan metastore Hive yang dikelola sendiri.

Catatan

Opsi ini hanya diperlukan jika Anda menggunakan Katalog Hive.

Opsi eksklusif sink

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

write.operation

Mode operasi penulisan.

STRING

Tidak

upsert

Nilai valid:

  • upsert: Data diperbarui.

  • insert: Data ditulis ke tabel dalam mode tambahan.

  • bulk_insert: Sejumlah data tertentu ditulis sekaligus dan data yang ada tidak diperbarui.

hive_sync.enable

Menentukan apakah sinkronisasi metadata ke Hive diaktifkan.

BOOLEAN

Tidak

false

Nilai valid:

  • true: Sinkronisasi metadata ke Hive diaktifkan.

  • false: Sinkronisasi metadata ke Hive dinonaktifkan.

hive_sync.mode

Mode sinkronisasi data Hive.

STRING

Tidak

hms

  • hms: Jika Anda menggunakan metastore Hive atau katalog DLF, pertahankan nilai default.

  • jdbc: Jika katalog Java Database Connectivity (JDBC) digunakan, atur nilai ini menjadi jdbc.

hive_sync.db

Nama database Hive ke mana data disinkronkan.

STRING

Tidak

Nama database tabel saat ini dalam katalog

Tidak tersedia.

hive_sync.table

Nama tabel Hive ke mana data disinkronkan.

STRING

Tidak

Nama tabel saat ini

Tidak tersedia.

dlf.catalog.region

Nama wilayah di mana layanan DLF diaktifkan.

STRING

Tidak

Tidak ada nilai default

Catatan
  • Opsi dlf.catalog.region hanya berlaku ketika opsi hive_sync.mode diatur ke hms.

  • Pastikan nilai opsi ini sesuai dengan titik akhir yang ditentukan oleh opsi dlf.catalog.endpoint.

dlf.catalog.endpoint

Titik akhir DLF.

STRING

Tidak

Tidak ada nilai default

Catatan
  • Opsi dlf.catalog.endpoint hanya berlaku ketika opsi hive_sync.mode diatur ke hms.

  • Kami merekomendasikan Anda mengatur opsi dlf.catalog.endpoint ke titik akhir VPC dari DLF. Sebagai contoh, jika Anda memilih wilayah China (Hangzhou), atur opsi dlf.catalog.endpoint menjadi dlf-vpc.cn-hangzhou.aliyuncs.com.

  • Jika Anda ingin mengakses DLF lintas VPC, ikuti petunjuk yang dijelaskan dalam FAQ tentang manajemen dan operasi ruang kerja serta namespace

Pemetaan tipe data

Tipe data Apache Iceberg

Tipe data Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

Catatan

Timestamp Apache Iceberg akurat hingga mikrodetik, dan timestamp Realtime Compute for Apache Flink akurat hingga milidetik. Saat menggunakan Realtime Compute for Apache Flink untuk membaca data Apache Iceberg, presisi waktu disesuaikan ke milidetik.

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

Contoh kode

Pastikan bucket OSS dan database DLF telah dibuat. Untuk informasi lebih lanjut, lihat Membuat Bucket dan Tabel Database dan Fungsi.

Catatan

Saat menentukan directory untuk database DLF Anda, kami merekomendasikan Anda memasukkan direktori dalam format ${warehouse}/${database_name}.db. Sebagai contoh, jika nilai opsi warehouse adalah oss://iceberg-test/warehouse dan nilai opsi database_name adalah dlf_db, atur direktori OSS dari database dlf_db menjadi oss://iceberg-test/warehouse/dlf_db.db.

Membuat dan menggunakan tabel sink Apache Iceberg

Hasilkan data streaming acak menggunakan konektor Datagen dan tulis data tersebut ke Apache Iceberg:

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Membuat dan menggunakan tabel sumber Apache Iceberg

  • Buat tabel Flink yang dipetakan ke tabel Iceberg, yang dikelola dalam katalog Hive dan disimpan dalam metastore Hive yang dikelola sendiri:

    Catatan
    • Pastikan Anda telah membuat koneksi antara ruang kerja Flink dan kluster metastore Hive yang dikelola sendiri.

    • File data akan disimpan di direktori: oss://<bucket>/<path>/<database-name>/flink_table.

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<post>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<yourAccessKeyID>',
      'access-key-secret'='<yourAccessKeySecret>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • Buat dua tabel Flink yang dipetakan ke tabel Iceberg yang dikelola dalam katalog DLF, lalu baca data dari satu tabel Flink dan tulis ke tabel lainnya:

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

Referensi

Untuk informasi lebih lanjut tentang konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang Didukung.