All Products
Search
Document Center

Realtime Compute for Apache Flink:Iceberg

Last Updated:Apr 21, 2025

This topic describes how to use the Apache Iceberg connector.

Background information

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a compute engine of the open source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto to analyze data in your data lake.

Category

Details

Supported type

Source table and sink table

Running mode

Batch mode and streaming mode

Data format

N/A

Metric

None

API

SQL

Data update or deletion in a sink table

Yes

Features

Apache Iceberg provides the following core capabilities:

  • Builds a low-cost lightweight data lake storage service based on HDFS or object storage.

  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.

  • Supports historical version backtracking.

  • Supports efficient data filtering.

  • Supports Schema Evolution.

  • Supports partition evolution.

  • Supports using with a self-managed Hive metastore. For more information, see Use the Hive catalog with a self-managed Hive Metastore (HMS).

Note

You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Apache Iceberg connector. The Apache Iceberg connector must be used with the DLF catalog. For more information, see Manage the DLF catalog.

  • The Apache Iceberg connector supports the Apache Iceberg table format of version 1 and version 2. For more information, see Iceberg Table Spec.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports the Apache Iceberg table format of version 2.

  • If the streaming read mode is enabled, only Apache Iceberg tables in which data is written in Append Only mode can be used as source tables.

Syntax

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

Connector options in the with clause

Common options for source tables

Parameter

Description

Data type

Required

Default value

Remarks

connector

The type of the source table.

String

Yes

None

Set the value to iceberg.

catalog-name

The catalog name

String

Yes

None

The custom name.

catalog-database

The database name

String

Yes

default

Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.

Note

If you have not created a DLF database, create one.

io-impl

The name of the implementation class in the distributed file system

String

Yes

None

Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.

oss.endpoint

The endpoint of your OSS bucket

String

No

None

For more information, see Regions and endpoints.

Note
  • We recommend that you configure a virtual private cloud (VPC) endpoint for the oss.endpoint parameter. For example, if you select the China (Hangzhou) region, set the oss.endpoint parameter to oss-cn-hangzhou-internal.aliyuncs.com.

  • If you need to access OSS across VPCs, see How do I access other services across VPCs?

  • access.key.id: VVR 8.0.6 and earlier

  • access-key-id: VVR 8.0.7 and later

The AccessKey ID of your Alibaba Cloud account

String

Yes

None

For more information, see How do I view the AccessKey ID and AccessKey Secret?

Important

To prevent the leakage of your AccessKey information, we recommend that you use variables to specify the AccessKey values. For more information, see Project variables.

  • access.key.secret: VVR 8.0.6 and earlier

  • access-key-secret: VVR 8.0.7 and later

The AccessKey Secret of your Alibaba Cloud account

String

Yes

None

catalog-impl

The class name of the catalog

String

Yes

None

Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.

warehouse

The OSS directory in which table data is stored

String

Yes

None

None.

dlf.catalog-id

The ID of your Alibaba Cloud account

String

Yes

None

You can obtain the account ID from the Account Management page.

dlf.endpoint

The endpoint of DLF

String

Yes

None

.

Note
  • We recommend that you configure a VPC endpoint for the dlf.endpoint parameter. For example, if you select the China (Hangzhou) region, set the dlf.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

  • If you need to access DLF across VPCs, see Space management and operations

dlf.region-id

The name of the region in which the DLF service is activated

String

Yes

None

.

Note

Make sure that the region that you specify for the dlf.endpoint parameter is the same as the region that you specify for this parameter.

uri

The thrift URI of the Hive metastore

String

Required only when you use the Hive catalog.

None

Used with a self-managed Hive Metastore.

Options for sink tables

Parameter

Description

Data type

Required

Default value

Remarks

write.operation

The write operation mode

String

No

upsert

  • upsert: Data is updated. This is the default value.

  • insert: Data is written to the table in append mode.

  • bulk_insert: A specific amount of data is written at a time and existing data is not updated.

hive_sync.enable

Specifies whether to enable the synchronization of metadata to Hive

boolean

No

false

Valid values:

  • true: The RAM policy is enabled.

  • false: The exactly-once semantics is not used. This is the default value.

hive_sync.mode

The Hive data synchronization mode

String

No

hms

  • hms: If you use a Hive metastore or DLF catalog, retain the default value. This is the default value.

  • jdbc: If the Java Database Connectivity (JDBC) catalog is used, set this value to jdbc.

hive_sync.db

The name of the Hive database to which data is synchronized

String

No

Database name of the current table in the catalog

None.

hive_sync.table

The name of the Hive table to which data is synchronized

String

No

Name of the current table

None.

dlf.catalog.region

The name of the region in which the DLF service is activated

String

No

None

.

Note
  • The dlf.catalog.region parameter takes effect only when the hive_sync.mode parameter is set to hms.

  • Make sure that the region that you specify for the dlf.catalog.endpoint parameter is the same as the region that you specify for this parameter.

dlf.catalog.endpoint

The endpoint of DLF

String

No

None

.

Note
  • The dlf.catalog.endpoint parameter takes effect only when the hive_sync.mode parameter is set to hms.

  • We recommend that you configure a VPC endpoint for the dlf.catalog.endpoint parameter. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

  • If you need to access DLF across VPCs, see Space management and operations

Data type mappings

Apache Iceberg data type

Flink data type

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

Note

Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

Examples

Confirm that an OSS bucket and a DLF database are created. For more information, see Create buckets and Databases, tables, and functions.

Note

When Selecting A Path for creating a DLF database, you are recommended to follow the format ${warehouse}/${database_name}.db. For example, if the warehouse address is oss://iceberg-test/warehouse and the database name is dlf_db, the OSS path for dlf_db needs to be set to oss://iceberg-test/warehouse/dlf_db.db.

Sample code for a sink table

This example shows how to use the Datagen connector to randomly generate streaming data and write the data to an Apache Iceberg result table.

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '',
  'catalog-database' = '',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '',
  'dlf.catalog-id' = '',
  'dlf.endpoint' = '',  
  'dlf.region-id' = ''
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Sample code for a source table

  • Use the Hive catalog with a self-managed Hive Metastore (HMS).

    You must ensure that Flink and the HMS cluster can communicate with each other. The data will be stored in the oss://///flink_table directory.

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='',
      'catalog-database'='',
      'uri'='thrift://:',
      'warehouse'='oss:///',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='',
      'access-key-secret'='',
      'oss.endpoint'=''
    );
  • Use the DLF catalog to write data from an Apache Iceberg source table to an Apache Iceberg sink table.

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '',
      'catalog-database' = '',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '',
      'dlf.catalog-id' = '',
      'dlf.endpoint' = '',  
      'dlf.region-id' = ''
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '',
      'catalog-database' = '',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '',
      'dlf.catalog-id' = '',
      'dlf.endpoint' = '',  
      'dlf.region-id' = ''
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

References

For more information, see Supported connectors.