本文介紹如何使用Iceberg連接器。
背景資訊
Apache Iceberg是一種開放的資料湖表格格式。您可以藉助Apache Iceberg快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並藉助開源巨量資料生態的Flink、Spark、Hive、Presto等計算引擎來實現資料湖的分析。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 批模式和流模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
目前Apache Iceberg提供以下核心能力:
基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。
完善的ACID語義。
支援歷史版本回溯。
支援高效的資料過濾。
支援Schema Evolution。
支援Partition Evolution。
支援和自建Hive Metastore配合使用,詳情請參見使用Hive Catalog,配合自建Hive Metastore(HMS)使用。
您可以藉助Flink高效的容錯能力和流處理能力,把海量的日誌行為資料即時匯入到Apache Iceberg資料湖內,再藉助Flink或者其他分析引擎來實現資料價值的提取。
使用限制
僅Flink計算引擎VVR 4.0.8及以上版本支援Iceberg連接器。Iceberg連接器需要搭配DLF Catalog一起使用,詳情請參見管理DLF Catalog。
Iceberg連接器支援Apache Iceberg v1和v2表格式,詳情請參見Iceberg Table Spec。
說明僅Realtime Compute引擎VVR 8.0.7及以上版本支援v2表格式。
流讀模式下,僅支援將Append Only的Iceberg表作為源表。
文法結構
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);WITH參數
通用(源表)
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 源表類型 | String | 是 | 無 | 固定值為 |
catalog-name | Catalog名稱 | String | 是 | 無 | 請填寫為自訂的英文名。 |
catalog-database | 資料庫名稱 | String | 是 | default | 對應在DLF上建立的資料庫名稱,例如dlf_db。 說明 如果您沒有建立對應的DLF資料庫,請建立DLF資料庫。 |
io-impl | Distributed File System的實作類別名 | String | 是 | 無 | 固定值為 |
oss.endpoint | 阿里雲Object Storage Service服務OSS的Endpoint | String | 否 | 無 | 請詳情參見OSS地區和訪問網域名稱。 說明
|
| 阿里雲帳號的AccessKey ID | String | 是 | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret資訊? 重要 為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。 |
| 阿里雲帳號的AccessKey Secret | String | 是 | 無 | |
catalog-impl | Catalog的Class類名 | String | 是 | 無 | 固定值為 |
warehouse | 表資料存放在OSS的路徑 | String | 是 | 無 | 無。 |
dlf.catalog-id | 阿里雲帳號的帳號ID | String | 是 | 無 | 可通過使用者資訊頁面擷取帳號ID。 |
dlf.endpoint | DLF服務的Endpoint | String | 是 | 無 | 。 說明
|
dlf.region-id | DLF服務的地區名 | String | 是 | 無 | 。 說明 請和dlf.endpoint選擇的地區保持一致。 |
uri | Hive metastore的thrift URI | String | 僅當使用Hive Catalog時,必填。 | 無 | 配合自建Hive Metastore使用。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
write.operation | 寫入操作模式 | String | 否 | upsert |
|
hive_sync.enable | 是否開啟同步中繼資料到Hive功能 | boolean | 否 | false | 參數取值如下:
|
hive_sync.mode | Hive資料同步模式 | String | 否 | hms |
|
hive_sync.db | 同步到Hive的資料庫名稱 | String | 否 | 當前Table在Catalog中的資料庫名 | 無。 |
hive_sync.table | 同步到Hive的表名稱 | String | 否 | 當前Table名 | 無。 |
dlf.catalog.region | DLF服務的地區名 | String | 否 | 無 | 。 說明
|
dlf.catalog.endpoint | DLF服務的Endpoint | String | 否 | 無 | 。 說明
|
類型映射
Iceberg欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME 說明 Iceberg時間戳記精度為微秒,Flink時間戳記精度為毫秒。在使用Flink讀取Iceberg資料時,時間精度會對齊到毫秒。 |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
程式碼範例
請確認您已建立了OSS Bucket和DLF資料庫。詳情請參見控制台建立儲存空間和資料庫表及函數。
在建立DLF資料庫選擇路徑時,建議按照${warehouse}/${database_name}.db格式填寫。例如,如果warehouse地址為oss://iceberg-test/warehouse,資料庫的名稱為dlf_db,則dlf_db的OSS路徑需要設定為oss://iceberg-test/warehouse/dlf_db.db。
結果表示例
本樣本為您介紹如何通過Datagen連接器隨機產生流式資料寫入Iceberg表。
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;源表示例
使用Hive Catalog,配合自建Hive Metastore(HMS)使用。
您需要保證Flink與HMS叢集網路互連,資料將儲存在
oss://<bucket>/<path>/<database-name>/flink_table目錄下。CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<post>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<aliyun ak>', 'access-key-secret'='<aliyun sk>', 'oss.endpoint'='<yourOSSEndpoint>' );使用DLF Catalog,將Iceberg源表資料寫入到Iceberg結果表中。
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
相關文檔
Flink支援的連接器,請參見支援的連接器。