This topic provides the DDL syntax that is used to create a full Apache Iceberg source table, describes the parameters in the WITH clause, and provides data type mappings and an example.

What is Apache Iceberg?

Apache Iceberg is an open data lake table format. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a compute engine of the open source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto to analyze data in your data lake. Apache Iceberg provides the following core capabilities:
  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
  • Supports historical version backtracking.
  • Supports efficient data filtering.
  • Supports schema evolution.
  • Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.
Note The Apache Iceberg connector can be used to store data of the result tables for Flink streaming jobs. The Apache Iceberg connector can also be used to store data of the source tables and result tables for Flink batch jobs. This topic describes how to use an Apache Iceberg table as the source table for Flink batch jobs.

Limits

  • Only the Flink compute engine of vvr-4.0.8-flink-1.13 or later supports the Apache Iceberg connector.
  • The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.

DDL syntax

CREATE TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to iceberg.
catalog-name The name of the catalog. Yes Set the value to a custom name.
catalog-type The type of the catalog. Yes Set the value to custom.
catalog-database The name of the database. Yes Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.
io-impl The name of the implementation class in the distributed file system. Yes Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.
oss.endpoint The endpoint of your OSS bucket. No For more information, see Regions and endpoints.
Note
access.key.id The AccessKey ID of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey ID, see Obtain an AccessKey pair.
access.key.secret The AccessKey secret of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey secret, see Obtain an AccessKey pair.
catalog-impl The class name of the catalog. Yes Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.
warehouse The OSS directory in which table data is stored. Yes N/A.
dlf.catalog-id The ID of your Alibaba Cloud account that is used to access DLF. Yes To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. Obtain the ID of your Alibaba Cloud account
dlf.endpoint The endpoint of the DLF service. Yes
Note
dlf.region-id The region name of the DLF service. Yes
Note Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.

Data type mappings

Data type of Apache Iceberg Data type of Flink
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

Example

The following example describes how to use a Flink job to import data from an Apache Iceberg source table to an Apache Iceberg result table. This process implements basic extract, transform, load (ETL) operations.

  1. Confirm that a DLF database is created.
    If you have not created a DLF database, create one.
    Note When you select an OSS directory for your DLF database, make sure that the directory is in the ${warehouse}/${database_name}.db format. For example, if warehouse is oss://iceberg-test/warehouse and database_name is dlf_db, the OSS directory of the dlf_db database is oss://iceberg-test/warehouse/dlf_db.db.
  2. Write a Flink batch SQL job to generate test data and import the data to an Apache Iceberg table.
    1. In the left-side navigation pane of the console of fully managed Flink, click Draft Editor.
    2. In the upper-left corner of the page, click New. In the New Draft dialog box, select BATCH / SQL from the Type drop-down list.
      Create a batch job
    3. Write SQL statements in the text editor. The following code shows an example.
      CREATE TEMPORARY TABLE src_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      CREATE TEMPORARY TABLE dst_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      BEGIN STATEMENT SET;
      
      INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
      INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
      
      END;
  3. On the Advanced tab of the Draft Editor page, set Engine Version to vr-4.0.8-flink-1.13.
    Engine Version
  4. Click Validate.
  5. Click Publish.
  6. Click Ok.
  7. View the test data that has been written in the OSS console.
    After the batch job is complete, you can view the data that is written to the dst_iceberg table in the OSS directory.