本文介绍如何使用Iceberg连接器。

背景信息

Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。
类别详情
支持类型源表和结果表
运行模式批模式和流模式
数据格式暂不适用
特有监控指标暂无
API种类SQL
是否支持更新或删除结果表数据

特色功能

目前Apache Iceberg提供以下核心能力:
  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。
  • 完善的ACID语义。
  • 支持历史版本回溯。
  • 支持高效的数据过滤。
  • 支持Schema Evolution。
  • 支持Partition Evolution。
说明 您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。

使用限制

  • 仅Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。
  • Iceberg连接器仅支持Apache Iceberg v1表格式,详情请参见Iceberg Table Spec

语法结构

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  ...
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector源表类型String固定值为iceberg
    catalog-nameCatalog名称String请填写为自定义的英文名。
    catalog-typeCatalog类型String固定值为custom
    catalog-database数据库名称Stringdefault对应在DLF上创建的数据库名称,例如dlf_db。
    说明 如果您没有创建对应的DLF数据库,请创建DLF数据库。
    io-impl分布式文件系统的实现类名String固定值为org.apache.iceberg.aliyun.oss.OSSFileIO
    oss.endpoint阿里云对象存储服务OSS的EndpointString请详情参见访问域名和数据中心
    说明
    • 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。
    • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC的其他服务?
    access.key.id阿里云账号的AccessKey IDString获取方法请参见创建AccessKey
    access.key.secret阿里云账号的AccessKey SecretString获取方法请参见创建AccessKey
    catalog-implCatalog的Class类名String固定值为org.apache.iceberg.aliyun.dlf.DlfCatalog
    warehouse表数据存放在OSS的路径String无。
    dlf.catalog-id阿里云账号的账号IDString可通过用户信息页面获取账号ID。
    dlf.endpointDLF服务的EndpointString
    说明
    • 推荐您为dlf.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com
    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?
    dlf.region-idDLF服务的地域名String
    说明 请和dlf.endpoint选择的地域保持一致。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    write.operation写入操作模式Stringupsert
    • upsert(默认):数据更新。
    • insert:数据追加写入。
    • bulk_insert:批量写入(不更新)。
    hive_sync.enable是否开启同步元数据到Hive功能booleanfalse参数取值如下:
    • true:开启
    • false(默认值):不开启。
    hive_sync.modeHive数据同步模式Stringhms
    • hms(默认值):采用Hive Metastore或者DLF Catalog时,需要设置hms。
    • jdbc:采用jdbc Catalog时,需要设置为jdbc。
    hive_sync.db同步到Hive的数据库名称String当前Table在Catalog中的数据库名无。
    hive_sync.table同步到Hive的表名称String当前Table名无。
    dlf.catalog.regionDLF服务的地域名String
    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。
    • 请和dlf.catalog.endpoint选择的地域保持一致。
    dlf.catalog.endpointDLF服务的EndpointString
    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。
    • 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com
    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?

类型映射

Iceberg字段类型Flink字段类型
BOOLEANBOOLEAN
INTINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(P,S)DECIMAL(P,S)
DATEDATE
TIMETIME
说明 Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。
TIMESTAMPTIMESTAMP
TIMESTAMPTZTIMESTAMP_LTZ
STRINGSTRING
FIXED(L)BYTES
BINARYVARBINARY
STRUCT<...>ROW
LIST<E>LIST
MAP<K,V>MAP

代码示例

请先确认您已成功创建了DLF数据库。如果您没有创建对应的DLF数据库,请创建DLF数据库。
说明 创建数据库选择OSS路径时,请将名称设置为${warehouse}/${database_name}.db。例如,如果您将warehouse地址设置为oss://iceberg-test/warehouse,数据库的名称设置为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db

结果表示例

本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。
  1. 创建OSS Bucket,详情请参见控制台创建存储空间
  2. 作业开发页面的目标作业文本编辑区域,编写SQL流作业。
    CREATE TEMPORARY TABLE datagen(
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE dlf_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-type' = 'custom',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '<yourAccessKeyId>',
      'access.key.secret' = '<yourAccessKeySecret>',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    INSERT INTO dlf_iceberg SELECT * FROM datagen;

源表示例

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;