您可以通过Flink将数据写入Hive Metastore或者DLF Catalog下的Iceberg表。本文为您介绍如何将数据写入到数据湖Iceberg结果,包括DDL定义、WITH参数、类型映射和代码示例等。

背景信息

Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的FlinkSparkHivePresto等计算引擎来实现数据湖的分析。目前Apache Iceberg提供以下核心能力:
  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。
  • 完善的ACID语义。
  • 支持历史版本回溯。
  • 支持高效的数据过滤。
  • 支持Schema Evolution。
  • 支持Partition Evolution。
您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。您可以通过以下任意一种方式来使用Iceberg Connector:
  • 将数据写入至Hive Metastore下的Iceberg表

    Apache Hive Metastore通常被作为开源大数据的统一元数据中心。如果您的元数据被维护在Apache Hive Metastore,希望通过Flink将数据写入Iceberg表内,然后将数据存储在阿里云OSS上,可以采用该方式来使用Iceberg Connector。

  • 将数据写入至DLF Catalog下的Iceberg表

    是基于阿里云大数据生态产品设计和研发的统一元数据中心。借助该元数据中心您可以方便地采用开源大数据计算引擎(Spark、Hive、Presto或Flink)访问同一份数据湖内的数据。如果您希望元数据被存储在阿里云DLF上,可以采用该方式来使用Iceberg Connector。

说明 Iceberg Connector既可以作为Flink Stream作业的结果表,也可以作为Flink批作业的源表和结果表。

前提条件

将数据写入至DLF Catalog下的Iceberg表前,需要已开通阿里云DLF。

使用限制

  • 仅Flink计算引擎vvr-4.0.8-flink-1.13及以上版本支持Iceberg Connector。
  • Iceberg Connector仅支持日志数据入湖,暂不支持CDC或者Binlog数据入湖。
  • Iceberg Connector仅支持采用Hive Metastore和阿里云DLF作为Catalog,文件系统采用阿里云OSS服务。暂不支持采用开源HDFS作为Iceberg表的文件系统。

类型映射

Iceberg字段类型 Flink字段类型
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
说明 Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

将数据写入至Hive Metastore下的Iceberg表

下面为您介绍如何通过Flink将数据写入到Apache Hive Metastore上的Iceberg表。

  1. 验证Hive Metastore依赖的Hadoop能否正确访问阿里云OSS数据。
    无论您使用的是阿里云EMR上的Hive Metastore,还是自建的Hive Metastore,都需要确保Hive Metastore所依赖的Hadoop能正确访问阿里云OSS上的数据。具体的确认方法如下:
    • 阿里云EMR上的Hive Metastore。
      通常,阿里云EMR上的Hive Metastore是可以直接访问该账号下的OSS数据的。如果您需要确认,则可以登录到Hive Metastore所在的主机,详情请参见登录集群。然后通过以下命令来验证Hadoop能否正确访问OSS数据。假如您需要访问oss://table-format/路径,则验证命令如下。
      hdfs dfs  -ls oss://table-format/
      说明 oss://table-format/为示例值。在实际验证时,需要替换为您账号下的Bucket名称和OSS路径。
    • 自建的Hive Metastore。
      如果您使用的是自建Hive Metastore,则需要手动配置Hadoop以便能正确地访问OSS数据。操作步骤详情如下。
      1. 打开Hadoop的阿里云OSS插件开关,让Hadoop classpath可以自动加载阿里云OSS的相关依赖。阿里云OSS插件开关详情请参见Hadoop阿里云模块。不同Hadoop版本的操作方式如下表所示。
        Hadoop版本 操作方式
        hadoop 3.x.x $HADOOP_HOME/etc/hadoop/hadoop-env.sh中新增如下内容。
        export HADOOP_OPTIONAL_TOOLS="hadoop-aliyun"
        hadoop 2.9.x $HADOOP_HOME/libexec/hadoop-config.sh中新增如下内容。
        CLASSPATH=${CLASSPATH}:${TOOL_PATH}
        hadoop 2.8.x及以下版本 不支持访问阿里云OSS服务。
      2. $HADOOP_HOME/etc/hadoop/core-site.xml中新增如下内容,并将fs.oss.endpointfs.oss.accessKeyIdfs.oss.accessKeySecret参数值替换为您的实际值。
        <property>
            <name>fs.AbstractFileSystem.oss.impl</name>
            <value>org.apache.hadoop.fs.aliyun.oss.OSS</value>
        </property>
        
        <property>
            <name>fs.oss.impl</name>
            <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
        </property>
        
        <property>
            <name>fs.oss.endpoint</name>
            <value>${YOUR_OSS_ENDPOINT}</value>
        </property>
        
        <property>
            <name>fs.oss.accessKeyId</name>
            <value>${YOUR_ACCESS_KEY_ID}</value>
        </property>
        
        <property>
            <name>fs.oss.accessKeySecret</name>
            <value>${YOUR_ACCESS_KEY_SECRET}</value>
        </property>
        其中涉及的参数解释如下表所示。
        参数 说明
        fs.oss.endpoint 阿里云对象存储OSS的域名,详情请参见访问域名和数据中心
        fs.oss.accessKeyId 阿里云账号的Access Key。获取方法请参见获取AccessKey
        fs.oss.accessKeySecret 阿里云账号的AccessKey Secret。获取方法请参见获取AccessKey
      3. 通过hdfs命令行来验证是否可以正确读取OSS数据。
        hdfs dfs  -ls oss://table-format/
        说明 oss://table-format/为示例值。在实际验证时,需要替换为您账号下的Bucket名称和OSS路径。
  2. 配置网络以便Flink可以正确地访问Hive Metastore端口。
    配置和验证的方式,请参见管理Hive Metastore
  3. 编写SQL并执行入湖作业。
    CREATE TEMPORARY TABLE flink_source(
      id BIGINT,
      data STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE iceberg_sink (
      id BIGINT,
      data STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = 'hive_prod',
      'catalog-type' = 'hive',
      'engine.hive.enabled' = 'true',
      'uri' = 'thrift://<host>:<port>',
      'location' = 'oss://<oss-bucket>/<oss-object>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<aliyun-oss-endpoint>',
      'access.key.id' = '<aliyun-oss-access-key>',
      'access.key.secret' = '<aliyun-oss-access-key-secret>'
    );
    
    INSERT INTO iceberg_sink SELECT * FROM flink_source;
    WITH参数解释如下表所示。
    参数 说明 是否必选 备注
    connector 结果表类型。 固定值为iceberg
    catalog-name Catalog名称。 请填写为自定义的英文名。
    catalog-type Catalog类型。 固定值为hive。
    catalog-database 数据库名称。 对应用户在Hive Metastore上创建的数据库名称,例如iceberg_db。
    engine.hive.enabled 存放在Hive Metastore中的Iceberg表的元数据是否能被Hive计算引擎读取。 参数取值如下:
    • true(推荐值):可以被Hive计算引擎直接读取。
    • false:不可以被Hive计算引擎直接读取。
    uri Hive Metastore的thrift server的地址。 格式为thrift://<host>:<port>。其中:
    • host表示Hive Metastore的IP地址。
    • port表示Hive Metstore的端口,默认值为9083。
    location 结果表存放的OSS路径。 格式为oss://<bucket>/<object>。其中:
    • bucket表示您创建的OSS Bucket名称。
    • object表示您存放数据的路径。
    io-impl 分布式文件系统的实现类名称。 固定值为org.apache.iceberg.aliyun.oss.OSSFileIO。
    oss.endpoint 阿里云对象存储服务OSS的Endpoint。 请详情参见访问域名和数据中心
    说明
    • 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。
    • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC里的存储资源?
    access.key.id 阿里云账号的AccessKey ID。 获取方法请参见获取AccessKey
    access.key.secret 阿里云账号的AccessKey Secret。 获取方法请参见获取AccessKey

将数据写入至DLF Catalog下的Iceberg表

下面为您介绍如何通过Flink将数据写入到阿里云DLF上的Iceberg表。

  • DDL定义
    CREATE TABLE dlf_iceberg (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-type' = 'custom',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '<yourAccessKeyId>',
      'access.key.secret' = '<yourAccessKeySecret>',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
  • WITH参数说明
    参数 说明 是否必选 备注
    connector 结果表类型。 固定值为iceberg
    catalog-name Catalog名称。 请填写为自定义的英文名。
    catalog-type Catalog类型。 固定值为custom。
    catalog-database 数据库名称。 对应用户在DLF上创建的数据库名称,例如dlf_db。
    io-impl 分布式文件系统的实现类名。 固定值为org.apache.iceberg.aliyun.oss.OSSFileIO
    oss.endpoint 阿里云对象存储服务OSS的Endpoint。 请详情参见访问域名和数据中心
    说明
    • 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。
    • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC里的存储资源?
    access.key.id 阿里云账号的AccessKey ID。 获取方法请参见获取AccessKey
    access.key.secret 阿里云账号的Access Secret。 获取方法请参见获取AccessKey
    catalog-impl Catalog的Class类名。 固定值为org.apache.iceberg.aliyun.dlf.DlfCatalog
    warehouse 表数据存放在OSS的路径。 无。
    dlf.catalog-id 阿里云账号的账号ID。 登录账号信息,请通过用户信息页面获取。获取登录账号
    dlf.endpoint DLF服务的Endpoint。
    说明
    • 推荐您为dlf.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC里的存储资源?
    dlf.region-id DLF服务的地域名。
    说明 请和dlf.endpoint选择的地域保持一致。
  • 代码示例
    1. 创建DLF数据库。
      说明 创建数据库选择OSS路径时,请将DLF数据库OSS路径选择为${warehouse}/${database_name}.db。例如,如果您将warehouse地址设置为oss://iceberg-test/warehouse,数据库的名称设置为dlf_db,则dlf_db的OSS路径需要选择为oss://iceberg-test/warehouse/dlf_db.db
    2. 作业开发页面,在文本编辑区域,编写SQL设计流作业。
      CREATE TEMPORARY TABLE datagen(
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE dlf_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      INSERT INTO dlf_iceberg SELECT * FROM datagen;
    3. 在作业开发页面右侧高级配置面版中,引擎版本配置为vvr-4.0.8-flink-1.13引擎版本
    4. 单击验证
    5. 单击上线
    6. 在OSS控制台查看写入的测试数据。

      等第一次Checkpoint完成之后,您将能看到写入的测试数据了。