本文介绍如何使用Iceberg连接器。
背景信息
Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。
类别 | 详情 |
---|---|
支持类型 | 源表和结果表 |
运行模式 | 批模式和流模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
目前Apache Iceberg提供以下核心能力:
- 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。
- 完善的ACID语义。
- 支持历史版本回溯。
- 支持高效的数据过滤。
- 支持Schema Evolution。
- 支持Partition Evolution。
说明 您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。
使用限制
- 仅Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。
- Iceberg连接器仅支持Apache Iceberg v1表格式,详情请参见Iceberg Table Spec。
语法结构
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
...
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector 源表类型 String 是 无 固定值为 iceberg
。catalog-name Catalog名称 String 是 无 请填写为自定义的英文名。 catalog-type Catalog类型 String 是 无 固定值为 custom
。catalog-database 数据库名称 String 是 default 对应在DLF上创建的数据库名称,例如dlf_db。 说明 如果您没有创建对应的DLF数据库,请创建DLF数据库。io-impl 分布式文件系统的实现类名 String 是 无 固定值为 org.apache.iceberg.aliyun.oss.OSSFileIO
。oss.endpoint 阿里云对象存储服务OSS的Endpoint String 否 无 请详情参见访问域名和数据中心。 说明- 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。
- 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC的其他服务?
access.key.id 阿里云账号的AccessKey ID String 是 无 获取方法请参见创建AccessKey。 access.key.secret 阿里云账号的AccessKey Secret String 是 无 获取方法请参见创建AccessKey。 catalog-impl Catalog的Class类名 String 是 无 固定值为 org.apache.iceberg.aliyun.dlf.DlfCatalog
。warehouse 表数据存放在OSS的路径 String 是 无 无。 dlf.catalog-id 阿里云账号的账号ID String 是 无 可通过用户信息页面获取账号ID。 dlf.endpoint DLF服务的Endpoint String 是 无 。 说明- 推荐您为dlf.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
- 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?
dlf.region-id DLF服务的地域名 String 是 无 。 说明 请和dlf.endpoint选择的地域保持一致。 - 结果表独有
参数 说明 数据类型 是否必填 默认值 备注 write.operation 写入操作模式 String 否 upsert - upsert(默认):数据更新。
- insert:数据追加写入。
- bulk_insert:批量写入(不更新)。
hive_sync.enable 是否开启同步元数据到Hive功能 boolean 否 false 参数取值如下: - true:开启
- false(默认值):不开启。
hive_sync.mode Hive数据同步模式 String 否 hms - hms(默认值):采用Hive Metastore或者DLF Catalog时,需要设置hms。
- jdbc:采用jdbc Catalog时,需要设置为jdbc。
hive_sync.db 同步到Hive的数据库名称 String 否 当前Table在Catalog中的数据库名 无。 hive_sync.table 同步到Hive的表名称 String 否 当前Table名 无。 dlf.catalog.region DLF服务的地域名 String 否 无 。 说明- 仅当hive_sync.mode设置为
hms
时,dlf.catalog.region参数设置才生效。 - 请和dlf.catalog.endpoint选择的地域保持一致。
dlf.catalog.endpoint DLF服务的Endpoint String 否 无 。 说明- 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。
- 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
- 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?
类型映射
Iceberg字段类型 | Flink字段类型 |
---|---|
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME 说明 Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。 |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
代码示例
请先确认您已成功创建了DLF数据库。如果您没有创建对应的DLF数据库,请创建DLF数据库。
说明 创建数据库选择OSS路径时,请将名称设置为${warehouse}/${database_name}.db。例如,如果您将warehouse地址设置为oss://iceberg-test/warehouse,数据库的名称设置为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db。
结果表示例
本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。
- 创建OSS Bucket,详情请参见控制台创建存储空间。
- 在作业开发页面的目标作业文本编辑区域,编写SQL流作业。
CREATE TEMPORARY TABLE datagen( id BIGINT, data STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE dlf_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-type' = 'custom', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '<yourAccessKeyId>', 'access.key.secret' = '<yourAccessKeySecret>', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); INSERT INTO dlf_iceberg SELECT * FROM datagen;
源表示例
CREATE TEMPORARY TABLE src_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-type' = 'custom',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '<yourAccessKeyId>',
'access.key.secret' = '<yourAccessKeySecret>',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
CREATE TEMPORARY TABLE dst_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-type' = 'custom',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '<yourAccessKeyId>',
'access.key.secret' = '<yourAccessKeySecret>',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
BEGIN STATEMENT SET;
INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
END;