This topic provides the DDL syntax that is used to create an Apache Iceberg result table, describes the parameters in the WITH clause, and provides data type mappings and an example.

What is Apache Iceberg?

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a computing engine of the open source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto to analyze data in your data lake. Apache Iceberg provides the following core capabilities:
  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
  • Supports historical version backtracking.
  • Supports efficient data filtering.
  • Supports schema evolution.
  • Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large number of behavioral logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analysis engine to extract the value of your data.
Note Apache Iceberg connectors can be used to store data of the result tables for Flink streaming jobs. Apache Iceberg connectors can also be used to store data of the source tables and result tables for Flink batch jobs.

Limits

  • Apache Iceberg connectors are supported only by Flink that uses vvr-4.0.8-flink-1.13 or later.
  • Apache Iceberg connectors allow you to ingest only common log data into data lakes and do not allow you to ingest Change Data Capture (CDC) data or binary log data into data lakes.
  • Apache Iceberg connectors support only Data Lake Formation (DLF) used as the catalog service and OSS used as the file system. Apache Iceberg connectors do not allow you to deploy Flink streaming jobs that use data in Hive metastores or HDFS.

DDL syntax

CREATE TABLE dlf_iceberg (
  id   BIGINT,
  data STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to iceberg.
catalog-name The name of the catalog. Yes Set the value to a custom name.
catalog-type The type of the catalog. Yes Set the value to custom.
catalog-database The name of the database. Yes Set the value to the name of the database that is created on DLF. Example: dlf_db.
io-impl The name of the implementation class in the distributed file system. Yes Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.
oss.endpoint The endpoint of OSS. Yes For more information, see Regions and endpoints.
Note
access.key.id The AccessKey ID of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey ID, see Obtain an AccessKey pair.
access.key.secret The AccessKey secret of your Alibaba Cloud account. Yes For more information about how to obtain the AccessKey secret, see Obtain an AccessKey pair.
catalog-impl The class name of the catalog. Yes Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.
warehouse The OSS directory in which table data is stored. Yes N/A.
dlf.catalog-id The ID of your Alibaba Cloud account. Yes To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. Obtain the ID of your Alibaba Cloud account
dlf.endpoint The endpoint of the DLF service. Yes
Note
dlf.region-id The region name of the DLF service. Yes
Note Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.

Data type mapping

Data type of Apache Iceberg Data type of Flink
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

Example

  1. Create a DLF database.
    Note When you select an OSS directory for your DLF database, make sure that the directory is in the ${warehouse}/${database_name}.db format. For example, if warehouse is oss://iceberg-test/warehouse and database_name is dlf_db, the OSS directory of the dlf_db database is oss://iceberg-test/warehouse/dlf_db.db.
  2. Write Flink streaming SQL statements to ingest the output data of streaming jobs into a data lake.
    1. In the left-side navigation pane, click Draft Editor.
    2. In the upper-left corner of the page, click New. In the New Draft dialog box, select STREAM / SQL from the Type drop-down list.
    3. In the script editor, write SQL statements to ingest the output data of streaming jobs into the data lake.
      CREATE TEMPORARY TABLE datagen(
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE dlf_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      INSERT INTO dlf_iceberg SELECT * FROM datagen;
  3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.8-flink-1.13.
    Engine Version
  4. Click Validate.
  5. Click Publish.
  6. View the test data that has been written in the OSS console.
    You can view the test data that has been written after the first checkpointing operation is complete.