You can use Flink to write data to an Apache Iceberg table that is stored in a Hive metastore or Data Lake Formation (DLF) catalog. This topic provides the DDL syntax that is used to write data to an Apache Iceberg result table, describes the parameters in the WITH clause, and provides data type mappings and an example.

Background information

Apache Iceberg is an open data lake table format. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a compute engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake. Apache Iceberg provides the following core capabilities:
  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
  • Supports historical version backtracking.
  • Supports efficient data filtering.
  • Supports schema evolution.
  • Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data. You can use the Apache Iceberg connector to write data to an Apache Iceberg table in the following scenarios:
  • Write data to an Apache Iceberg table that is stored in a Hive metastore

    In most cases, an Apache Hive metastore is used as a unified metadata center for the open source big data ecosystem. If your metadata is maintained in an Apache Hive metastore, you can write the data to an Apache Iceberg table by using the Apache Iceberg connector and store the data in Alibaba Cloud OSS.

  • Write data to an Apache Iceberg table that is stored in a DLF catalog

    is a unified metadata center that is designed and developed based on Alibaba Cloud big data ecosystem products. Alibaba Cloud DLF allows you to use an open source big data compute engine, such as Apache Spark, Apache Hive, Apache Presto, or Apache Flink, to access data in the same data lake. If you want to store metadata in Alibaba Cloud DLF, you can use the Apache Iceberg connector.

Note The Apache Iceberg connector can be used to store data of the result tables for Flink streaming jobs. The Apache Iceberg connector can also be used to store data of the source tables and result tables for Flink batch jobs.

Prerequisites

Alibaba Cloud DLF is activated before you write data to an Apache Iceberg table that is stored in an Alibaba Cloud DLF catalog.

Limits

  • Only Flink of vvr-4.0.8-flink-1.13 or later supports the Apache Iceberg connector.
  • The Apache Iceberg connector allows you to ingest only log data into data lakes and does not allow you to ingest Change Data Capture (CDC) data or binary log data into data lakes.
  • The Apache Iceberg connector allows you to store Apache Iceberg tables only in a Hive metastore or an Alibaba Cloud DLF catalog and use Alibaba Cloud OSS as a file system. You are not allowed to use open source HDFS as a file system for Apache Iceberg tables.
  • The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.

Data type mappings

Data type of Apache Iceberg Data type of Flink
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

Write data to an Apache Iceberg table that is stored in a Hive metastore

The following section describes how to write data to an Apache Iceberg table that is stored in a Hive metastore.

  1. Check whether Hadoop on which the Hive metastore depends can normally access data on Alibaba Cloud OSS.
    Make sure that Hadoop on which the Hive metastore depends can normally access data on Alibaba Cloud OSS regardless of whether you use a Hive metastore that is hosted on Alibaba Cloud E-MapReduce (EMR) or a self-managed Hive metastore. The verification method varies based on the type of your Hive metastore.
    • Hive metastore that is hosted on Alibaba Cloud EMR
      In most cases, Hive metastores that are hosted on Alibaba Cloud EMR can directly access OSS data that belongs to the Alibaba Cloud account. To check whether the access is normal, you can log on to the host in which the Hive metastore resides. For more information, see Log on to a cluster. Then, run the following command to check whether Hadoop can normally access OSS data. For example, if the OSS path is oss://table-format/, run the following command:
      hdfs dfs  -ls oss://table-format/
      Note oss://table-format/ is used in this example. During the verification, replace the value with your OSS bucket name and path within your Alibaba Cloud account.
    • Self-managed Hive metastore
      If you use a self-managed Hive metastore, you must manually configure Hadoop to ensure that Hadoop can access OSS data. To configure Hadoop and check whether Hadoop can access OSS data, perform the following steps:
      1. Enable the Alibaba Cloud OSS plug-in for Hadoop. This allows HADOOP_CLASSPATH to automatically load the dependencies that are related to Alibaba Cloud OSS. For more information about the Alibaba Cloud OSS plug-in for Hadoop, see Hadoop-Aliyun module. The following table describes the operations that you can perform in different Hadoop versions.
        Hadoop version Operation
        hadoop 3.x.x Add the following content to the hadoop-env.sh file in the $HADOOP_HOME/etc/hadoop/ path.
        export HADOOP_OPTIONAL_TOOLS="hadoop-aliyun"
        hadoop 2.9.x Add the following content to the hadoop-config.sh file in the $HADOOP_HOME/libexec/ path.
        CLASSPATH=${CLASSPATH}:${TOOL_PATH}
        Hadoop 2.8.x and earlier Hadoop cannot access Alibaba Cloud OSS.
      2. Add the following content to the core-site.xml file in the $HADOOP_HOME/etc/hadoop/ path and replace the values of the fs.oss.endpoint, fs.oss.accessKeyId, and fs.oss.accessKeySecret parameters with actual values.
        <property>
            <name>fs.AbstractFileSystem.oss.impl</name>
            <value>org.apache.hadoop.fs.aliyun.oss.OSS</value>
        </property>
        
        <property>
            <name>fs.oss.impl</name>
            <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
        </property>
        
        <property>
            <name>fs.oss.endpoint</name>
            <value>${YOUR_OSS_ENDPOINT}</value>
        </property>
        
        <property>
            <name>fs.oss.accessKeyId</name>
            <value>${YOUR_ACCESS_KEY_ID}</value>
        </property>
        
        <property>
            <name>fs.oss.accessKeySecret</name>
            <value>${YOUR_ACCESS_KEY_SECRET}</value>
        </property>
        The following table describes the parameters.
        Parameter Description
        fs.oss.endpoint The endpoint of Alibaba Cloud OSS. For more information, see Regions and endpoints.
        fs.oss.accessKeyId The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain the AccessKey ID of your Alibaba Cloud account, see Obtain an AccessKey pair.
        fs.oss.accessKeySecret The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret of your Alibaba Cloud account, see Obtain an AccessKey pair.
      3. Run the following hdfs command to check whether Hadoop can normally read data from OSS:
        hdfs dfs  -ls oss://table-format/
        Note oss://table-format/ is used in this example. During the verification, replace the value with your OSS bucket name and path within your Alibaba Cloud account.
  2. Configure the network so that Flink can normally access the port of the Hive Metastore service.
    For more information, see Manage a Hive metastore.
  3. Write SQL statements and run a data ingestion job.
    CREATE TEMPORARY TABLE flink_source(
      id BIGINT,
      data STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE iceberg_sink (
      id BIGINT,
      data STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = 'hive_prod',
      'catalog-type' = 'hive',
      'engine.hive.enabled' = 'true',
      'uri' = 'thrift://<host>:<port>',
      'location' = 'oss://<oss-bucket>/<oss-object>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<aliyun-oss-endpoint>',
      'access.key.id' = '<aliyun-oss-access-key>',
      'access.key.secret' = '<aliyun-oss-access-key-secret>'
    );
    
    INSERT INTO iceberg_sink SELECT * FROM flink_source;
    The following table describes the parameters in the WITH clause.
    Parameter Description Required Remarks
    connector The type of the result table. Yes Set the value to iceberg.
    catalog-name The name of the catalog. Yes Set the value to a custom name.
    catalog-type The type of the catalog. Yes Set the value to hive.
    catalog-database The name of the database. Yes The name of the database that is created in the Hive metastore. For example, you can set this parameter to iceberg_db.
    engine.hive.enabled Specifies whether the Hive compute engine can read metadata from the Apache Iceberg table that is stored in the Hive metastore. No Valid values:
    • true (recommended): The Hive compute engine can directly read metadata.
    • false: The Hive compute engine cannot directly read metadata.
    uri The Uniform Resource Identifier (URI) of the thrift server of the Hive metastore. Yes The URI is in the thrift://<host>:<port> format.
    • host indicates the IP address of the Hive metastore.
    • port indicates the port of the Hive metastore. Default value: 9083.
    location The OSS path in which the result table is stored. Yes The path must be in the oss://<bucket>/<object> format.
    • bucket indicates the name of the OSS bucket that you created.
    • object indicates the path in which your data is stored.
    io-impl The name of the implementation class in the distributed file system. Yes Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.
    oss.endpoint The endpoint of OSS. Yes For more information, see Regions and endpoints.
    Note
    access.key.id The AccessKey ID of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey ID of your Alibaba Cloud account, see Obtain an AccessKey pair.
    access.key.secret The AccessKey secret of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey secret of your Alibaba Cloud account, see Obtain an AccessKey pair.

Write data to an Apache Iceberg table that is stored in a DLF catalog

The following section describes how to use Flink to write data to an Apache Iceberg table on Alibaba Cloud DLF.

  • DDL syntax
    CREATE TABLE dlf_iceberg (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-type' = 'custom',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '<yourAccessKeyId>',
      'access.key.secret' = '<yourAccessKeySecret>',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
  • Parameters in the WITH clause
    Parameter Description Required Remarks
    connector The type of the result table. Yes Set the value to iceberg.
    catalog-name The name of the catalog. Yes Set the value to a custom name.
    catalog-type The type of the catalog. Yes Set the value to custom.
    catalog-database The name of the database. Yes Set the value to the name of the database that is created on DLF. Example: dlf_db.
    io-impl The name of the implementation class in the distributed file system. Yes Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.
    oss.endpoint The endpoint of OSS. Yes For more information, see Regions and endpoints.
    Note
    access.key.id The AccessKey ID of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey ID of your Alibaba Cloud account, see Obtain an AccessKey pair.
    access.key.secret The AccessKey secret of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey secret of your Alibaba Cloud account, see Obtain an AccessKey pair.
    catalog-impl The class name of the catalog. Yes Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.
    warehouse The OSS path in which table data is stored. Yes N/A.
    dlf.catalog-id The ID of your Alibaba Cloud account that is used to access DLF. Yes To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. Obtain the ID of your Alibaba Cloud account
    dlf.endpoint The endpoint of the DLF service. Yes
    Note
    dlf.region-id The region name of the DLF service. Yes
    Note Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.
  • Example
    1. Create a DLF database.
      Note If you select an OSS path for your DLF database, make sure that the path is in the ${warehouse}/${database_name}.db format. For example, if warehouse is oss://iceberg-test/warehouse and database_name is dlf_db, the OSS path of the dlf_db database is oss://iceberg-test/warehouse/dlf_db.db.
    2. On the Draft Editor page, write an SQL streaming job in the text editor.
      CREATE TEMPORARY TABLE datagen(
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE dlf_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      INSERT INTO dlf_iceberg SELECT * FROM datagen;
    3. On the right side of the Draft Editor page in the console of fully managed Flink, click the Advanced tab and set Engine Version to vvr-4.0.8-flink-1.13. Engine Version
    4. Click Validate.
    5. Click Publish.
    6. Click Ok.
    7. View the test data that has been written in the OSS console.

      You can view the test data that has been written after the first checkpointing operation is complete.