All Products
Search
Document Center

Realtime Compute for Apache Flink:Apache Iceberg

Last Updated:Mar 26, 2026

Use the Iceberg connector to read from and write to Apache Iceberg tables backed by Object Storage Service (OSS) and Data Lake Formation (DLF). Apache Iceberg is an open table format for data lakes that supports ACID transactions, schema evolution, and partition evolution on HDFS or OSS. You can analyze data in your data lake using compute engines such as Flink, Spark, Hive, and Presto.

Connector overview

Category Details
Supported table types Source table, sink table, data ingestion destination
Runtime modes Batch mode, stream mode
Data format Not applicable
Specific monitoring metrics None
API types SQL, YAML jobs (data ingestion only)
Supports updates and deletes in sink tables Yes
Supported Iceberg table formats v1, v2 (v2 requires Ververica Runtime (VVR) 8.0.7 or later)
Minimum VVR version 4.0.8

Features

Apache Iceberg provides the following core features:

  • A lightweight, cost-effective data lake storage service built on HDFS or OSS.

  • Full support for ACID semantics (atomicity, consistency, isolation, and durability).

  • Support for historical version rollbacks.

  • Efficient data filtering.

  • Schema evolution.

  • Partition evolution.

  • Compatibility with a self-managed Hive Metastore.

You can use Flink's fault tolerance and stream processing capabilities to ingest large volumes of log and behavioral data into an Apache Iceberg data lake in real time. You can then use Flink or other analytics engines to extract value from that data.

Limitations

  • Requires a Data Lake Formation (DLF) Catalog. For more information, see Manage DLF-Legacy Catalogs.

  • In stream mode, only append-only Iceberg tables are supported as source tables.

  • The v2 table format requires VVR 8.0.7 or later.

Syntax

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING,
  PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
  'connector' = 'iceberg',
  ...
);

WITH parameters

DLF Catalog parameters (recommended)

Use these parameters when connecting to an Iceberg table managed by a DLF Catalog.

Parameter Required Default Description
connector Yes Must be iceberg.
catalog-name Yes A custom catalog name in English.
catalog-database Yes default The DLF database name, for example, dlf_db. Create the database in DLF before using this parameter.
catalog-impl Yes Must be org.apache.iceberg.aliyun.dlf.DlfCatalog.
io-impl Yes Must be org.apache.iceberg.aliyun.oss.OSSFileIO.
warehouse Yes The OSS path where table data is stored.
dlf.catalog-id Yes Your Alibaba Cloud account ID. Get it from the User Information page.
dlf.endpoint Yes The DLF endpoint. Use the VPC endpoint to avoid public network traffic, for example, dlf-vpc.cn-hangzhou.aliyuncs.com for the China (Hangzhou) region. See Endpoints.
dlf.region-id Yes The DLF region. Must match the region in dlf.endpoint.
oss.endpoint No The OSS endpoint. Use the VPC endpoint, for example, oss-cn-hangzhou-internal.aliyuncs.com. See Regions and endpoints. To access OSS across VPCs, see How do I access other services across VPCs?
access.key.id (VVR 8.0.6 and earlier) / access-key-id (VVR 8.0.7 and later) Yes Your AccessKey ID. See How do I view the AccessKey ID and AccessKey secret? Store this value as a project variable to avoid leaking credentials. See Project variables.
access.key.secret (VVR 8.0.6 and earlier) / access-key-secret (VVR 8.0.7 and later) Yes Your AccessKey secret.

Hive Catalog parameters

Use these parameters when connecting to an Iceberg table managed by a self-managed Hive Metastore (HMS). Make sure the Flink cluster can reach the HMS cluster over the network.

Parameter Required Default Description
connector Yes Must be iceberg.
catalog-name Yes A custom catalog name in English.
catalog-database Yes default The database name in HMS.
uri Yes The Thrift URI of the Hive Metastore, for example, thrift://<ip>:<port>.
warehouse Yes The OSS path where table data is stored.
io-impl Yes Must be org.apache.iceberg.aliyun.oss.OSSFileIO.
oss.endpoint No The OSS endpoint. Use the VPC endpoint.
access-key-id Yes Your AccessKey ID.
access-key-secret Yes Your AccessKey secret.

Sink-specific parameters

Parameter Required Default Description
write.operation No upsert Write mode: upsert (update existing rows), insert (append only), or bulk_insert (bulk append without updates).
hive_sync.enable No false Specifies whether to sync metadata to Hive.
hive_sync.mode No hms Sync mode: hms (for Hive Metastore or DLF Catalog) or jdbc (for JDBC Catalog).
hive_sync.db No Current catalog database name The Hive database to sync metadata to.
hive_sync.table No Current table name The Hive table to sync metadata to.
dlf.catalog.region No The DLF region for Hive sync. Takes effect only when hive_sync.mode is hms. Must match the region in dlf.catalog.endpoint.
dlf.catalog.endpoint No The DLF endpoint for Hive sync. Takes effect only when hive_sync.mode is hms. Use the VPC endpoint, for example, dlf-vpc.cn-hangzhou.aliyuncs.com. To access DLF across VPCs, see Storage space management and operations.

Type mapping

Iceberg type Flink type
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME (Iceberg uses microsecond precision; Flink uses milliseconds. Precision is aligned to milliseconds when reading.)
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K,V> MAP

Examples

Prerequisites

Before running the examples, complete the following:

When creating the DLF database, set the Path using the format ${warehouse}/${database_name}.db. For example, if your warehouse is oss://iceberg-test/warehouse and the database name is dlf_db, set the path to oss://iceberg-test/warehouse/dlf_db.db.

Write streaming data to an Iceberg sink table

This example uses the Datagen connector to generate random streaming data and write it to an Iceberg table via a DLF Catalog.

CREATE TEMPORARY TABLE datagen (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector'        = 'iceberg',
  'catalog-name'     = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl'          = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'     = '<yourOSSEndpoint>',
  'access.key.id'    = '${secret_values.ak_id}',
  'access.key.secret'= '${secret_values.ak_secret}',
  'catalog-impl'     = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse'        = '<yourOSSWarehousePath>',
  'dlf.catalog-id'   = '<yourCatalogId>',
  'dlf.endpoint'     = '<yourDLFEndpoint>',
  'dlf.region-id'    = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Read from an Iceberg source table (Hive Catalog)

This example reads from an Iceberg table managed by a self-managed Hive Metastore. The table data is stored at oss://<bucket>/<path>/<database-name>/flink_table.

CREATE TEMPORARY TABLE flink_table (
  id   BIGINT,
  data STRING
) WITH (
  'connector'        = 'iceberg',
  'catalog-name'     = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'uri'              = 'thrift://<ip>:<port>',
  'warehouse'        = 'oss://<bucket>/<path>',
  'io-impl'          = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'access-key-id'    = '<yourAccessKeyId>',
  'access-key-secret'= '<yourAccessKeySecret>',
  'oss.endpoint'     = '<yourOSSEndpoint>'
);

Copy data between two Iceberg tables (DLF Catalog)

This example inserts seed data into a source Iceberg table and copies it to a destination table, both using a DLF Catalog.

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector'        = 'iceberg',
  'catalog-name'     = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl'          = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'     = '<yourOSSEndpoint>',
  'access.key.id'    = '${secret_values.ak_id}',
  'access.key.secret'= '${secret_values.ak_secret}',
  'catalog-impl'     = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse'        = '<yourOSSWarehousePath>',
  'dlf.catalog-id'   = '<yourCatalogId>',
  'dlf.endpoint'     = '<yourDLFEndpoint>',
  'dlf.region-id'    = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector'        = 'iceberg',
  'catalog-name'     = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl'          = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'     = '<yourOSSEndpoint>',
  'access.key.id'    = '${secret_values.ak_id}',
  'access.key.secret'= '${secret_values.ak_secret}',
  'catalog-impl'     = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse'        = '<yourOSSWarehousePath>',
  'dlf.catalog-id'   = '<yourCatalogId>',
  'dlf.endpoint'     = '<yourDLFEndpoint>',
  'dlf.region-id'    = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;

Data ingestion

Use the Iceberg connector as a sink in YAML jobs to ingest data into an Iceberg table.

Syntax

sink:
  type: iceberg
  name: Iceberg Sink
  catalog.properties.rest.signing-region: cn-beijing
  catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
  catalog.properties.warehouse: flink_iceberg
  catalog.properties.type: rest
  catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

Configuration parameters

Parameter Required Default Description
type Yes Must be iceberg.
name No A display name for the sink.
catalog.properties.rest.signing-region Yes The DLF region ID. See Endpoints.
catalog.properties.uri Yes The DLF REST Catalog URI. See Iceberg REST.
catalog.properties.warehouse Yes The DLF Catalog name.
catalog.properties.type Yes rest Must be rest.
catalog.properties.io-impl Yes org.apache.iceberg.rest.DlfFileIO Must be org.apache.iceberg.rest.DlfFileIO.
partition.key No Partition fields per table. Separate multiple tables with semicolons (;) and multiple partition keys with commas (,). Example: testdb.table1:id1,id2;testdb.table2:name. Supports implicit transform functions: truncate[10](id), hour(create_time), day(create_time), month(create_time), year(create_time), bucket[10](create_time).
table.properties.* No Iceberg table creation parameters. See Iceberg table options.

Example: MySQL to Iceberg via DLF REST Catalog

The following example ingests data from MySQL into Data Lake Formation using a DLF REST Catalog. For all catalog.properties.* parameters, see Create an Iceberg DLF Catalog.

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: iceberg
  name: Iceberg Sink
  catalog.properties.rest.signing-region: cn-beijing
  catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
  catalog.properties.warehouse: flink_iceberg
  catalog.properties.type: rest
  catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

Schema evolution

When using Iceberg as a data ingestion sink, the following schema change events are automatically propagated:

  • CREATE TABLE

  • ADD COLUMN

  • ALTER COLUMN TYPE (modifying the type of a primary key column is not supported)

  • RENAME COLUMN

  • DROP COLUMN

  • TRUNCATE TABLE

  • DROP TABLE

If the downstream Iceberg table already exists, data is written using the existing table schema. The system does not attempt to recreate the table.

Related topics