Topik ini menjelaskan cara menggunakan Konektor Apache Iceberg.
Informasi latar belakang
Apache Iceberg adalah format tabel terbuka untuk danau data. Anda dapat menggunakan Apache Iceberg untuk membangun layanan penyimpanan danau data berbasis HDFS atau Alibaba Cloud Object Storage Service (OSS). Selanjutnya, Anda dapat menggunakan mesin komputasi dari ekosistem big data open source seperti Apache Flink, Apache Spark, Apache Hive, atau Apache Presto untuk menganalisis data di danau data Anda. Tabel berikut memberikan gambaran umum tentang Konektor Apache Iceberg:
Item | Deskripsi |
Tipe tabel | Tabel sumber dan tabel sink |
Mode operasi | Mode batch dan mode streaming |
Format data | Tidak tersedia |
Metrik | Tidak tersedia |
Tipe API | SQL API |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Fitur
Konektor Apache Iceberg menyediakan kemampuan inti berikut:
Membangun layanan penyimpanan danau data ringan berbiaya rendah berbasis HDFS atau OSS.
Menyediakan semantik ACID (atomicity, consistency, isolation, durability) yang komprehensif.
Mendukung pelacakan versi historis.
Mendukung penyaringan data yang efisien.
Mendukung evolusi skema.
Mendukung evolusi partisi.
Mendukung penyimpanan data dalam metastore Hive yang dikelola sendiri. Untuk informasi lebih lanjut, lihat Buat dan gunakan tabel sumber Apache Iceberg.
Anda dapat menggunakan kemampuan toleransi kesalahan dan pemrosesan aliran yang kuat dari Flink untuk mengimpor sejumlah besar data perilaku log ke danau data Apache Iceberg secara real-time. Kemudian, Anda dapat menggunakan Flink atau mesin analitik lainnya untuk mengekstraksi nilai dari data Anda.
Batasan
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 4.0.8 atau lebih baru yang mendukung Konektor Apache Iceberg. Konektor Apache Iceberg harus digunakan bersama dengan katalog Data Lake Formation (DLF). Untuk informasi lebih lanjut, lihat Kelola katalog DLF.
Konektor Apache Iceberg mendukung format tabel Apache Iceberg versi 1 dan versi 2. Untuk informasi lebih lanjut, lihat Spesifikasi Tabel Iceberg.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau lebih baru yang mendukung format tabel Apache Iceberg versi 2.
Jika mode pembacaan streaming diaktifkan, hanya tabel Apache Iceberg di mana data ditulis dalam mode Append Only yang dapat digunakan sebagai tabel sumber.
Sintaksis
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);Opsi konektor dalam klausa WITH
Opsi umum dan tabel sumber
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
connector | Konektor yang akan digunakan. | STRING | Ya | Tidak ada nilai default | Atur nilainya menjadi |
catalog-name | Nama katalog. | STRING | Ya | Tidak ada nilai default | Masukkan nama katalog Anda. |
catalog-database | Nama database. | STRING | Ya | default | Atur nilainya menjadi nama database yang dibuat pada Data Lake Formation (DLF). Contoh: dlf_db. Catatan Jika Anda belum membuat database DLF, buat satu. |
io-impl | Nama kelas implementasi dalam sistem file terdistribusi. | STRING | Ya | Tidak ada nilai default | Atur nilainya menjadi |
oss.endpoint | Titik akhir bucket OSS Anda. | STRING | Tidak | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Wilayah dan titik akhir. Catatan
|
| ID AccessKey dari akun Alibaba Cloud Anda. | STRING | Ya | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Bagaimana cara melihat pasangan AccessKey dari sebuah akun? Penting Untuk meningkatkan keamanan kredensial Anda, hindari hardcoding pasangan AccessKey dalam teks biasa; gunakan variabel sebagai gantinya. Untuk informasi lebih lanjut, lihat Kelola kunci. |
| Rahasia AccessKey dari akun Alibaba Cloud Anda. | STRING | Ya | Tidak ada nilai default | |
catalog-impl | Nama kelas katalog. | STRING | Ya | Tidak ada nilai default | Atur nilainya menjadi org.apache.iceberg.aliyun.dlf.DlfCatalog. |
warehouse | Direktori OSS tempat data tabel disimpan. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
dlf.catalog-id | ID akun Alibaba Cloud Anda. | STRING | Ya | Tidak ada nilai default | Anda dapat pergi ke halaman Pengaturan Keamanan untuk mendapatkan ID akun. |
dlf.endpoint | Titik akhir DLF | STRING | Ya | Tidak ada nilai default | Catatan
|
dlf.region-id | Nama wilayah di mana layanan DLF diaktifkan. | STRING | Ya | Tidak ada nilai default | Catatan Pastikan wilayah yang Anda pilih sesuai dengan titik akhir yang Anda pilih untuk dlf.endpoint. |
uri | URI thrift untuk metastore Hive Anda. | STRING | Tidak | Tidak ada nilai default | Digunakan bersama dengan metastore Hive yang dikelola sendiri. Catatan Opsi ini hanya diperlukan jika Anda menggunakan Katalog Hive. |
Opsi eksklusif sink
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
write.operation | Mode operasi penulisan. | STRING | Tidak | upsert | Nilai valid:
|
hive_sync.enable | Menentukan apakah sinkronisasi metadata ke Hive diaktifkan. | BOOLEAN | Tidak | false | Nilai valid:
|
hive_sync.mode | Mode sinkronisasi data Hive. | STRING | Tidak | hms |
|
hive_sync.db | Nama database Hive ke mana data disinkronkan. | STRING | Tidak | Nama database tabel saat ini dalam katalog | Tidak tersedia. |
hive_sync.table | Nama tabel Hive ke mana data disinkronkan. | STRING | Tidak | Nama tabel saat ini | Tidak tersedia. |
dlf.catalog.region | Nama wilayah di mana layanan DLF diaktifkan. | STRING | Tidak | Tidak ada nilai default | Catatan
|
dlf.catalog.endpoint | Titik akhir DLF. | STRING | Tidak | Tidak ada nilai default | Catatan
|
Pemetaan tipe data
Tipe data Apache Iceberg | Tipe data Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME Catatan Timestamp Apache Iceberg akurat hingga mikrodetik, dan timestamp Realtime Compute for Apache Flink akurat hingga milidetik. Saat menggunakan Realtime Compute for Apache Flink untuk membaca data Apache Iceberg, presisi waktu disesuaikan ke milidetik. |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
Contoh kode
Pastikan bucket OSS dan database DLF telah dibuat. Untuk informasi lebih lanjut, lihat Membuat Bucket dan Tabel Database dan Fungsi.
Saat menentukan directory untuk database DLF Anda, kami merekomendasikan Anda memasukkan direktori dalam format ${warehouse}/${database_name}.db. Sebagai contoh, jika nilai opsi warehouse adalah oss://iceberg-test/warehouse dan nilai opsi database_name adalah dlf_db, atur direktori OSS dari database dlf_db menjadi oss://iceberg-test/warehouse/dlf_db.db.
Membuat dan menggunakan tabel sink Apache Iceberg
Hasilkan data streaming acak menggunakan konektor Datagen dan tulis data tersebut ke Apache Iceberg:
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;Membuat dan menggunakan tabel sumber Apache Iceberg
Buat tabel Flink yang dipetakan ke tabel Iceberg, yang dikelola dalam katalog Hive dan disimpan dalam metastore Hive yang dikelola sendiri:
CatatanPastikan Anda telah membuat koneksi antara ruang kerja Flink dan kluster metastore Hive yang dikelola sendiri.
File data akan disimpan di direktori:
oss://<bucket>/<path>/<database-name>/flink_table.
CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<post>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<yourAccessKeyID>', 'access-key-secret'='<yourAccessKeySecret>', 'oss.endpoint'='<yourOSSEndpoint>' );Buat dua tabel Flink yang dipetakan ke tabel Iceberg yang dikelola dalam katalog DLF, lalu baca data dari satu tabel Flink dan tulis ke tabel lainnya:
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
Referensi
Untuk informasi lebih lanjut tentang konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang Didukung.