全部產品
Search
文件中心

Realtime Compute for Apache Flink:Iceberg

更新時間:Apr 15, 2025

本文介紹如何使用Iceberg連接器。

背景資訊

Apache Iceberg是一種開放的資料湖表格格式。您可以藉助Apache Iceberg快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並藉助開源巨量資料生態的Flink、Spark、Hive、Presto等計算引擎來實現資料湖的分析。

類別

詳情

支援類型

源表和結果表

運行模式

批模式和流模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

特色功能

目前Apache Iceberg提供以下核心能力:

  • 基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。

  • 完善的ACID語義。

  • 支援歷史版本回溯。

  • 支援高效的資料過濾。

  • 支援Schema Evolution。

  • 支援Partition Evolution。

  • 支援和自建Hive Metastore配合使用,詳情請參見使用Hive Catalog,配合自建Hive Metastore(HMS)使用。

說明

您可以藉助Flink高效的容錯能力和流處理能力,把海量的日誌行為資料即時匯入到Apache Iceberg資料湖內,再藉助Flink或者其他分析引擎來實現資料價值的提取。

使用限制

  • 僅Flink計算引擎VVR 4.0.8及以上版本支援Iceberg連接器。Iceberg連接器需要搭配DLF Catalog一起使用,詳情請參見管理DLF Catalog

  • Iceberg連接器支援Apache Iceberg v1和v2表格式,詳情請參見Iceberg Table Spec

    說明

    僅Realtime Compute引擎VVR 8.0.7及以上版本支援v2表格式。

  • 流讀模式下,僅支援將Append Only的Iceberg表作為源表。

文法結構

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH參數

通用(源表)

參數

說明

資料類型

是否必填

預設值

備忘

connector

源表類型

String

固定值為iceberg

catalog-name

Catalog名稱

String

請填寫為自訂的英文名。

catalog-database

資料庫名稱

String

default

對應在DLF上建立的資料庫名稱,例如dlf_db。

說明

如果您沒有建立對應的DLF資料庫,請建立DLF資料庫。

io-impl

Distributed File System的實作類別名

String

固定值為org.apache.iceberg.aliyun.oss.OSSFileIO

oss.endpoint

阿里雲Object Storage Service服務OSS的Endpoint

String

請詳情參見OSS地區和訪問網域名稱

說明
  • 推薦您為oss.endpoint參數配置OSS的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則oss.endpoint需要配置為oss-cn-hangzhou-internal.aliyuncs.com。

  • 如果您需要跨VPC訪問OSS,則請參見如何訪問跨VPC的其他服務?

  • access.key.id:VVR 8.0.6及以下版本

  • access-key-id:VVR 8.0.7及以上版本

阿里雲帳號的AccessKey ID

String

詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

  • access.key.secret:VVR 8.0.6及以下版本

  • access-key-secret:VVR 8.0.7及以上版本

阿里雲帳號的AccessKey Secret

String

catalog-impl

Catalog的Class類名

String

固定值為org.apache.iceberg.aliyun.dlf.DlfCatalog

warehouse

表資料存放在OSS的路徑

String

無。

dlf.catalog-id

阿里雲帳號的帳號ID

String

可通過使用者資訊頁面擷取帳號ID。

dlf.endpoint

DLF服務的Endpoint

String

說明
  • 推薦您為dlf.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

  • 如果您需要跨VPC訪問DLF,則請參見空間管理與操作

dlf.region-id

DLF服務的地區名

String

說明

請和dlf.endpoint選擇的地區保持一致。

uri

Hive metastore的thrift URI

String

僅當使用Hive Catalog時,必填。

配合自建Hive Metastore使用。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

write.operation

寫入操作模式

String

upsert

  • upsert(預設):資料更新。

  • insert:資料追加寫入。

  • bulk_insert:批量寫入(不更新)。

hive_sync.enable

是否開啟同步中繼資料到Hive功能

boolean

false

參數取值如下:

  • true:開啟

  • false(預設值):不開啟。

hive_sync.mode

Hive資料同步模式

String

hms

  • hms(預設值):採用Hive Metastore或者DLF Catalog時,需要設定hms。

  • jdbc:採用jdbc Catalog時,需要設定為jdbc。

hive_sync.db

同步到Hive的資料庫名稱

String

當前Table在Catalog中的資料庫名

無。

hive_sync.table

同步到Hive的表名稱

String

當前Table名

無。

dlf.catalog.region

DLF服務的地區名

String

說明
  • 僅當hive_sync.mode設定為hms時,dlf.catalog.region參數設定才生效。

  • 請和dlf.catalog.endpoint選擇的地區保持一致。

dlf.catalog.endpoint

DLF服務的Endpoint

String

說明
  • 僅當hive_sync.mode設定為hms時,dlf.catalog.endpoint參數設定才生效。

  • 推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地區為cn-hangzhou地區,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

  • 如果您需要跨VPC訪問DLF,則請參見空間管理與操作

類型映射

Iceberg欄位類型

Flink欄位類型

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

說明

Iceberg時間戳記精度為微秒,Flink時間戳記精度為毫秒。在使用Flink讀取Iceberg資料時,時間精度會對齊到毫秒。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

程式碼範例

請確認您已建立了OSS Bucket和DLF資料庫。詳情請參見控制台建立儲存空間資料庫表及函數

說明

在建立DLF資料庫選擇路徑時,建議按照${warehouse}/${database_name}.db格式填寫。例如,如果warehouse地址為oss://iceberg-test/warehouse,資料庫的名稱為dlf_db,則dlf_db的OSS路徑需要設定為oss://iceberg-test/warehouse/dlf_db.db

結果表示例

本樣本為您介紹如何通過Datagen連接器隨機產生流式資料寫入Iceberg表。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

源表示例

  • 使用Hive Catalog,配合自建Hive Metastore(HMS)使用。

    您需要保證Flink與HMS叢集網路互連,資料將儲存在oss://<bucket>/<path>/<database-name>/flink_table目錄下。

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<post>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<aliyun ak>',
      'access-key-secret'='<aliyun sk>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • 使用DLF Catalog,將Iceberg源表資料寫入到Iceberg結果表中。

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

相關文檔

Flink支援的連接器,請參見支援的連接器