This topic provides the DDL syntax that is used to create a full Apache Iceberg source table, describes the parameters in the WITH clause, and provides data type mappings and an example.

What is Apache Iceberg?

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a computing engine of the open source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto to analyze data in your data lake. Apache Iceberg provides the following core capabilities:
  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
  • Supports historical version backtracking.
  • Supports efficient data filtering.
  • Supports schema evolution.
  • Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large number of behavioral logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analysis engine to extract the value of your data.
Note Apache Iceberg connectors can be used to store data of the result tables for Flink streaming jobs. Apache Iceberg connectors can also be used to store data of the source tables and result tables for Flink batch jobs. This topic describes how to use an Apache Iceberg table as the source table for Flink batch jobs.

Limits

Apache Iceberg connectors are supported only by Flink that uses vvr-4.0.8-flink-1.13 or later.

DDL syntax

CREATE TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to iceberg.
catalog-name The name of the catalog. Yes Set the value to a custom name.
catalog-type The type of the catalog. Yes Set the value to custom.
catalog-database The name of the database. Yes Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.
io-impl The name of the implementation class in the distributed file system. Yes Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.
oss.endpoint The endpoint of OSS. No For more information, see Regions and endpoints.
Note
access.key.id The AccessKey ID of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey ID, see Obtain an AccessKey pair.
access.key.secret The AccessKey secret of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey secret, see Obtain an AccessKey pair.
catalog-impl The class name of the catalog. Yes Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.
warehouse The OSS directory in which table data is stored. Yes N/A.
dlf.catalog-id The ID of your Alibaba Cloud account. Yes To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. Obtain the ID of your Alibaba Cloud account
dlf.endpoint The endpoint of the DLF service. Yes
Note
dlf.region-id The domain name of the DLF service. Yes
Note Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.

Data type mapping

Data type of Apache Iceberg Data type of Flink
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

Example

The following example describes how to use a Flink job to import data from an Apache Iceberg source table to an Apache Iceberg result table. This process implements basic extract, transform, load (ETL) operations.

  1. Confirm that a DLF database is created.
    If you have not created a DLF database, create one. For more information.
    Note When you select an OSS directory for your DLF database, make sure that the directory is in the ${warehouse}/${database_name}.db format. For example, if warehouse is oss://iceberg-test/warehouse and database_name is dlf_db, the OSS directory of the dlf_db database is oss://iceberg-test/warehouse/dlf_db.db.
  2. Write a Flink batch SQL job to generate test data and import the data to an Apache Iceberg table.
    1. In the left-side navigation pane, click Draft Editor.
    2. In the upper-left corner of the page, click New. In the New Draft dialog box, select BATCH / SQL from the Type drop-down list.
      Create a batch job
    3. Write SQL statements in the text editor. The following code shows an example.
      CREATE TEMPORARY TABLE src_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      CREATE TEMPORARY TABLE dst_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      BEGIN STATEMENT SET;
      
      INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
      INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
      
      END;
  3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.8-flink-1.13.
    Engine Version
  4. Click Validate.
  5. Click Publish.
  6. View the test data that is written in the OSS console.
    After the batch job is completed, you can view the data that is written to the dst_iceberg table in the OSS directory.