Topik ini menjelaskan cara menggunakan konektor Iceberg.
Informasi latar belakang
Apache Iceberg adalah format tabel terbuka untuk data lake. Anda dapat menggunakan Apache Iceberg untuk membangun layanan penyimpanan data lake yang hemat biaya dan dapat diskalakan di Hadoop Distributed File System (HDFS) atau Object Storage Service (OSS). Data tersebut kemudian dapat dianalisis menggunakan mesin komputasi dari ekosistem big data open source, seperti Flink, Spark, Hive, dan Presto.
|
Kategori |
Detail |
|
Jenis yang didukung |
Tabel sumber, tabel sink, dan tujuan ingesti data |
|
Mode runtime |
Mode batch dan mode stream |
|
Format data |
Tidak berlaku |
|
Metrik pemantauan khusus |
Tidak ada |
|
Jenis API |
Pekerjaan SQL, YAML untuk ingesti data |
|
Mendukung pembaruan atau penghapusan data di tabel sink |
Ya |
Fitur
Apache Iceberg menyediakan fitur-fitur inti berikut:
-
Layanan penyimpanan data lake yang ringan dan hemat biaya berbasis HDFS atau OSS.
-
Dukungan penuh terhadap semantik atomicity, consistency, isolation, dan durability (ACID).
-
Dukungan rollback ke versi historis.
-
Penyaringan data yang efisien.
-
Evolusi skema.
-
Evolusi partisi.
-
Kompatibilitas dengan Hive Metastore yang dikelola sendiri. Untuk informasi selengkapnya, lihat Gunakan Hive Catalog dengan Hive Metastore (HMS) yang dikelola sendiri.
Anda dapat memanfaatkan kemampuan toleransi kesalahan dan pemrosesan aliran Flink untuk mengingesti volume besar data log dan data perilaku ke dalam data lake Apache Iceberg secara real time, lalu mengekstraksi nilai dari data tersebut menggunakan Flink atau mesin analitik lainnya.
Batasan
-
Konektor Iceberg hanya didukung pada mesin komputasi Flink Ververica Runtime (VVR) 4.0.8 dan versi yang lebih baru. Anda harus menggunakan konektor Iceberg bersama Katalog Data Lake Formation (DLF). Untuk informasi selengkapnya, lihat Kelola Katalog DLF-Legacy.
-
Konektor Iceberg mendukung format tabel Apache Iceberg v1 dan v2. Untuk informasi selengkapnya, lihat Spesifikasi Tabel Iceberg.
CatatanFormat tabel v2 hanya didukung pada mesin komputasi waktu nyata VVR 8.0.7 dan versi yang lebih baru.
-
Dalam mode pembacaan stream, hanya tabel Iceberg append-only yang didukung sebagai tabel sumber.
Sintaksis
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
Parameter WITH
Parameter umum (untuk tabel sumber)
|
Parameter |
Deskripsi |
Tipe data |
Wajib |
Nilai default |
Keterangan |
|
connector |
Jenis tabel sumber. |
String |
Ya |
Tidak ada |
Nilainya harus |
|
catalog-name |
Nama katalog. |
String |
Ya |
Tidak ada |
Masukkan nama kustom dalam bahasa Inggris. |
|
catalog-database |
Nama database. |
String |
Ya |
default |
Nama database yang telah Anda buat di DLF, misalnya dlf_db. Catatan
Jika Anda belum membuat database DLF, buatlah satu. |
|
io-impl |
Kelas implementasi sistem file terdistribusi. |
String |
Ya |
Tidak ada |
Nilainya harus |
|
oss.endpoint |
Titik akhir Alibaba Cloud Object Storage Service (OSS). |
String |
Tidak |
Tidak ada |
Untuk informasi selengkapnya, lihat Wilayah dan titik akhir. Catatan
|
|
ID AccessKey akun Alibaba Cloud Anda. |
String |
Ya |
Tidak ada |
Untuk informasi selengkapnya, lihat Bagaimana cara melihat ID AccessKey dan rahasia AccessKey? Penting
Untuk mencegah kebocoran informasi AccessKey Anda, kami menyarankan agar Anda menggunakan variabel untuk nilai AccessKey. Untuk informasi selengkapnya, lihat Variabel proyek. |
|
Rahasia AccessKey akun Alibaba Cloud Anda. |
String |
Ya |
Tidak ada |
|
|
catalog-impl |
Nama kelas katalog. |
String |
Ya |
Tidak ada |
Nilainya harus |
|
warehouse |
Jalur di OSS tempat data tabel disimpan. |
String |
Ya |
Tidak ada |
Tidak ada. |
|
dlf.catalog-id |
ID akun Alibaba Cloud Anda. |
String |
Ya |
Tidak ada |
Anda dapat memperoleh ID akun dari halaman Informasi Pengguna. |
|
dlf.endpoint |
Titik akhir layanan DLF. |
String |
Ya |
Tidak ada |
. Catatan
|
|
dlf.region-id |
Wilayah layanan DLF. |
String |
Ya |
Tidak ada |
. Catatan
Wilayah ini harus sama dengan wilayah yang dipilih untuk dlf.endpoint. |
|
uri |
URI Thrift Hive metastore. |
String |
Wajib hanya saat Anda menggunakan Hive Catalog. |
Tidak ada |
Gunakan parameter ini dengan Hive Metastore yang dikelola sendiri. |
Parameter khusus untuk tabel sink
|
Parameter |
Deskripsi |
Tipe data |
Wajib |
Nilai default |
Keterangan |
|
write.operation |
Mode operasi penulisan. |
String |
Tidak |
upsert |
|
|
hive_sync.enable |
Menentukan apakah akan mengaktifkan sinkronisasi metadata ke Hive. |
boolean |
Tidak |
false |
Nilai yang valid:
|
|
hive_sync.mode |
Mode sinkronisasi data Hive. |
String |
Tidak |
hms |
|
|
hive_sync.db |
Nama database Hive tempat Anda ingin menyinkronkan metadata. |
String |
Tidak |
Nama database tempat tabel saat ini berada di katalog. |
Tidak ada. |
|
hive_sync.table |
Nama tabel Hive tempat Anda ingin menyinkronkan metadata. |
String |
Tidak |
Nama tabel saat ini. |
Tidak ada. |
|
dlf.catalog.region |
Wilayah layanan DLF. |
String |
Tidak |
Tidak ada |
. Catatan
|
|
dlf.catalog.endpoint |
Titik akhir layanan DLF. |
String |
Tidak |
Tidak ada |
. Catatan
|
Pemetaan tipe
|
Tipe bidang Iceberg |
Tipe bidang Flink |
|
BOOLEAN |
BOOLEAN |
|
INT |
INT |
|
LONG |
BIGINT |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
DECIMAL(P,S) |
DECIMAL(P,S) |
|
DATE |
DATE |
|
TIME |
TIME Catatan
Presisi timestamp Iceberg adalah mikrodetik, sedangkan presisi timestamp Flink adalah milidetik. Saat Anda menggunakan Flink untuk membaca data dari Iceberg, presisi waktu disejajarkan ke milidetik. |
|
TIMESTAMP |
TIMESTAMP |
|
TIMESTAMPTZ |
TIMESTAMP_LTZ |
|
STRING |
STRING |
|
FIXED(L) |
BYTES |
|
BINARY |
VARBINARY |
|
STRUCT<...> |
ROW |
|
LIST<E> |
LIST |
|
MAP<K,V> |
MAP |
Contoh kode
Pastikan Anda telah membuat bucket OSS dan database DLF. Untuk informasi selengkapnya, lihat Buat bucket di konsol dan Database, tabel, dan fungsi.
Saat membuat database DLF dan mengatur Path, disarankan menggunakan format ${warehouse}/${database_name}.db. Misalnya, jika alamat warehouse adalah oss://iceberg-test/warehouse dan nama database adalah dlf_db, atur jalur OSS untuk dlf_db ke oss://iceberg-test/warehouse/dlf_db.db.
Contoh tabel sink
Contoh ini menunjukkan cara menggunakan konektor Datagen untuk menghasilkan data streaming secara acak dan menulisnya ke tabel Iceberg.
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
Contoh tabel sumber
-
Gunakan Hive Catalog dengan Hive Metastore (HMS) yang dikelola sendiri.
Pastikan kluster Flink dapat berkomunikasi dengan kluster HMS melalui jaringan. Data disimpan di direktori
oss://<bucket>/<path>/<database-name>/flink_table.CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<port>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<yourAccessKeyId>', 'access-key-secret'='<yourAccessKeySecret>', 'oss.endpoint'='<yourOSSEndpoint>' ); -
Gunakan Katalog DLF untuk menulis data dari tabel sumber Iceberg ke tabel sink Iceberg.
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
Ingesti Data
Anda dapat menggunakan konektor Iceberg sebagai sink untuk menulis data dalam pekerjaan YAML guna mengingesti data.
Sintaksis
sink:
type: iceberg
name: Iceberg Sink
catalog.properties.rest.signing-region: cn-beijing
catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
catalog.properties.warehouse: flink_iceberg
catalog.properties.type: rest
catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO
Item konfigurasi
|
Parameter |
Deskripsi |
Wajib |
Tipe data |
Nilai default |
Keterangan |
|
type |
Jenis konektor. |
Ya |
STRING |
Tidak ada |
Nilainya harus |
|
name |
Nama sink. |
Tidak |
STRING |
Tidak ada |
Nama sink. |
|
catalog.properties.rest.signing-region |
ID wilayah DLF. Untuk informasi selengkapnya, lihat Titik akhir. |
Ya |
STRING |
Tidak ada |
Tidak ada |
|
catalog.properties.uri |
URI yang digunakan untuk mengakses Katalog REST DLF. Untuk informasi selengkapnya, lihat Iceberg REST. |
Ya |
STRING |
Tidak ada |
Tidak ada |
|
catalog.properties.warehouse |
Nama Katalog DLF. |
Ya |
STRING |
Tidak ada |
Tidak ada |
|
catalog.properties.warehouse |
Direktori root untuk penyimpanan file. |
Tidak |
STRING |
Tidak ada |
Tidak ada |
|
catalog.properties.type |
Jenis katalog. Nilainya harus rest. |
Ya |
STRING |
rest |
Tidak ada |
|
catalog.properties.io-impl |
Nilainya harus org.apache.iceberg.rest.DlfFileIO. |
Ya |
STRING |
org.apache.iceberg.rest.DlfFileIO |
Tidak ada |
|
partition.key |
Bidang partisi untuk setiap tabel partisi. |
Tidak |
STRING |
Tidak ada |
Kunci partisi untuk setiap tabel partisi. Anda dapat mengatur beberapa kunci primer untuk beberapa tabel. Gunakan titik koma ( Untuk partisi yang memerlukan transformasi implisit, Anda dapat menambahkan fungsi transformasi implisit langsung ke bidang partisi. Contoh: |
|
table.properties.* |
Parameter untuk membuat tabel Iceberg. |
Tidak |
String |
Tidak ada |
Untuk informasi selengkapnya, lihat Opsi tabel Iceberg. |
Contoh penggunaan
Kode berikut memberikan contoh konfigurasi penulisan data ke Data Lake Formation Alibaba Cloud saat Katalog Iceberg merupakan Katalog DLF:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: iceberg name: Iceberg Sink catalog.properties.rest.signing-region: cn-beijing catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg catalog.properties.warehouse: flink_iceberg catalog.properties.type: rest catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIOUntuk informasi tentang parameter yang memiliki awalan catalog.properties, lihat Buat Katalog Iceberg DLF.
Evolusi skema
Saat ini, Iceberg sebagai sink untuk ingesti data mendukung event evolusi skema berikut:
-
CREATE TABLE EVENT
-
EVENT ADD COLUMN
-
EVENT ALTER COLUMN TYPE (Memodifikasi tipe kolom kunci primer tidak didukung)
-
EVENT RENAME COLUMN
-
DROP COLUMN EVENT
-
TRUNCATE TABLE EVENT
-
DROP TABLE EVENT
Jika tabel Iceberg downstream sudah ada, data ditulis berdasarkan skema tabel yang ada. Sistem tidak mencoba membuat ulang tabel tersebut.
Referensi
Untuk informasi tentang konektor yang didukung Flink, lihat Konektor yang Didukung.