Use the Iceberg connector to read from and write to Apache Iceberg tables backed by Object Storage Service (OSS) and Data Lake Formation (DLF). Apache Iceberg is an open table format for data lakes that supports ACID transactions, schema evolution, and partition evolution on HDFS or OSS. You can analyze data in your data lake using compute engines such as Flink, Spark, Hive, and Presto.
Connector overview
| Category | Details |
|---|---|
| Supported table types | Source table, sink table, data ingestion destination |
| Runtime modes | Batch mode, stream mode |
| Data format | Not applicable |
| Specific monitoring metrics | None |
| API types | SQL, YAML jobs (data ingestion only) |
| Supports updates and deletes in sink tables | Yes |
| Supported Iceberg table formats | v1, v2 (v2 requires Ververica Runtime (VVR) 8.0.7 or later) |
| Minimum VVR version | 4.0.8 |
Features
Apache Iceberg provides the following core features:
-
A lightweight, cost-effective data lake storage service built on HDFS or OSS.
-
Full support for ACID semantics (atomicity, consistency, isolation, and durability).
-
Support for historical version rollbacks.
-
Efficient data filtering.
-
Schema evolution.
-
Partition evolution.
-
Compatibility with a self-managed Hive Metastore.
You can use Flink's fault tolerance and stream processing capabilities to ingest large volumes of log and behavioral data into an Apache Iceberg data lake in real time. You can then use Flink or other analytics engines to extract value from that data.
Limitations
-
Requires a Data Lake Formation (DLF) Catalog. For more information, see Manage DLF-Legacy Catalogs.
-
In stream mode, only append-only Iceberg tables are supported as source tables.
-
The v2 table format requires VVR 8.0.7 or later.
Syntax
CREATE TABLE iceberg_table (
id BIGINT,
data STRING,
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
WITH parameters
DLF Catalog parameters (recommended)
Use these parameters when connecting to an Iceberg table managed by a DLF Catalog.
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Must be iceberg. |
catalog-name |
Yes | — | A custom catalog name in English. |
catalog-database |
Yes | default |
The DLF database name, for example, dlf_db. Create the database in DLF before using this parameter. |
catalog-impl |
Yes | — | Must be org.apache.iceberg.aliyun.dlf.DlfCatalog. |
io-impl |
Yes | — | Must be org.apache.iceberg.aliyun.oss.OSSFileIO. |
warehouse |
Yes | — | The OSS path where table data is stored. |
dlf.catalog-id |
Yes | — | Your Alibaba Cloud account ID. Get it from the User Information page. |
dlf.endpoint |
Yes | — | The DLF endpoint. Use the VPC endpoint to avoid public network traffic, for example, dlf-vpc.cn-hangzhou.aliyuncs.com for the China (Hangzhou) region. See Endpoints. |
dlf.region-id |
Yes | — | The DLF region. Must match the region in dlf.endpoint. |
oss.endpoint |
No | — | The OSS endpoint. Use the VPC endpoint, for example, oss-cn-hangzhou-internal.aliyuncs.com. See Regions and endpoints. To access OSS across VPCs, see How do I access other services across VPCs? |
access.key.id (VVR 8.0.6 and earlier) / access-key-id (VVR 8.0.7 and later) |
Yes | — | Your AccessKey ID. See How do I view the AccessKey ID and AccessKey secret? Store this value as a project variable to avoid leaking credentials. See Project variables. |
access.key.secret (VVR 8.0.6 and earlier) / access-key-secret (VVR 8.0.7 and later) |
Yes | — | Your AccessKey secret. |
Hive Catalog parameters
Use these parameters when connecting to an Iceberg table managed by a self-managed Hive Metastore (HMS). Make sure the Flink cluster can reach the HMS cluster over the network.
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Must be iceberg. |
catalog-name |
Yes | — | A custom catalog name in English. |
catalog-database |
Yes | default |
The database name in HMS. |
uri |
Yes | — | The Thrift URI of the Hive Metastore, for example, thrift://<ip>:<port>. |
warehouse |
Yes | — | The OSS path where table data is stored. |
io-impl |
Yes | — | Must be org.apache.iceberg.aliyun.oss.OSSFileIO. |
oss.endpoint |
No | — | The OSS endpoint. Use the VPC endpoint. |
access-key-id |
Yes | — | Your AccessKey ID. |
access-key-secret |
Yes | — | Your AccessKey secret. |
Sink-specific parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
write.operation |
No | upsert |
Write mode: upsert (update existing rows), insert (append only), or bulk_insert (bulk append without updates). |
hive_sync.enable |
No | false |
Specifies whether to sync metadata to Hive. |
hive_sync.mode |
No | hms |
Sync mode: hms (for Hive Metastore or DLF Catalog) or jdbc (for JDBC Catalog). |
hive_sync.db |
No | Current catalog database name | The Hive database to sync metadata to. |
hive_sync.table |
No | Current table name | The Hive table to sync metadata to. |
dlf.catalog.region |
No | — | The DLF region for Hive sync. Takes effect only when hive_sync.mode is hms. Must match the region in dlf.catalog.endpoint. |
dlf.catalog.endpoint |
No | — | The DLF endpoint for Hive sync. Takes effect only when hive_sync.mode is hms. Use the VPC endpoint, for example, dlf-vpc.cn-hangzhou.aliyuncs.com. To access DLF across VPCs, see Storage space management and operations. |
Type mapping
| Iceberg type | Flink type |
|---|---|
BOOLEAN |
BOOLEAN |
INT |
INT |
LONG |
BIGINT |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DECIMAL(P,S) |
DECIMAL(P,S) |
DATE |
DATE |
TIME |
TIME (Iceberg uses microsecond precision; Flink uses milliseconds. Precision is aligned to milliseconds when reading.) |
TIMESTAMP |
TIMESTAMP |
TIMESTAMPTZ |
TIMESTAMP_LTZ |
STRING |
STRING |
FIXED(L) |
BYTES |
BINARY |
VARBINARY |
STRUCT<...> |
ROW |
LIST<E> |
LIST |
MAP<K,V> |
MAP |
Examples
Prerequisites
Before running the examples, complete the following:
-
An OSS bucket. See Create a bucket in the console.
-
A DLF database. See Databases, tables, and functions.
When creating the DLF database, set the Path using the format${warehouse}/${database_name}.db. For example, if your warehouse isoss://iceberg-test/warehouseand the database name isdlf_db, set the path tooss://iceberg-test/warehouse/dlf_db.db.
Write streaming data to an Iceberg sink table
This example uses the Datagen connector to generate random streaming data and write it to an Iceberg table via a DLF Catalog.
CREATE TEMPORARY TABLE datagen (
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret'= '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
Read from an Iceberg source table (Hive Catalog)
This example reads from an Iceberg table managed by a self-managed Hive Metastore. The table data is stored at oss://<bucket>/<path>/<database-name>/flink_table.
CREATE TEMPORARY TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'uri' = 'thrift://<ip>:<port>',
'warehouse' = 'oss://<bucket>/<path>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'access-key-id' = '<yourAccessKeyId>',
'access-key-secret'= '<yourAccessKeySecret>',
'oss.endpoint' = '<yourOSSEndpoint>'
);
Copy data between two Iceberg tables (DLF Catalog)
This example inserts seed data into a source Iceberg table and copies it to a destination table, both using a DLF Catalog.
CREATE TEMPORARY TABLE src_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret'= '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
CREATE TEMPORARY TABLE dst_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret'= '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
BEGIN STATEMENT SET;
INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
END;
Data ingestion
Use the Iceberg connector as a sink in YAML jobs to ingest data into an Iceberg table.
Syntax
sink:
type: iceberg
name: Iceberg Sink
catalog.properties.rest.signing-region: cn-beijing
catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
catalog.properties.warehouse: flink_iceberg
catalog.properties.type: rest
catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO
Configuration parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type |
Yes | — | Must be iceberg. |
name |
No | — | A display name for the sink. |
catalog.properties.rest.signing-region |
Yes | — | The DLF region ID. See Endpoints. |
catalog.properties.uri |
Yes | — | The DLF REST Catalog URI. See Iceberg REST. |
catalog.properties.warehouse |
Yes | — | The DLF Catalog name. |
catalog.properties.type |
Yes | rest |
Must be rest. |
catalog.properties.io-impl |
Yes | org.apache.iceberg.rest.DlfFileIO |
Must be org.apache.iceberg.rest.DlfFileIO. |
partition.key |
No | — | Partition fields per table. Separate multiple tables with semicolons (;) and multiple partition keys with commas (,). Example: testdb.table1:id1,id2;testdb.table2:name. Supports implicit transform functions: truncate[10](id), hour(create_time), day(create_time), month(create_time), year(create_time), bucket[10](create_time). |
table.properties.* |
No | — | Iceberg table creation parameters. See Iceberg table options. |
Example: MySQL to Iceberg via DLF REST Catalog
The following example ingests data from MySQL into Data Lake Formation using a DLF REST Catalog. For all catalog.properties.* parameters, see Create an Iceberg DLF Catalog.
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: iceberg
name: Iceberg Sink
catalog.properties.rest.signing-region: cn-beijing
catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
catalog.properties.warehouse: flink_iceberg
catalog.properties.type: rest
catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO
Schema evolution
When using Iceberg as a data ingestion sink, the following schema change events are automatically propagated:
-
CREATE TABLE -
ADD COLUMN -
ALTER COLUMN TYPE(modifying the type of a primary key column is not supported) -
RENAME COLUMN -
DROP COLUMN -
TRUNCATE TABLE -
DROP TABLE
If the downstream Iceberg table already exists, data is written using the existing table schema. The system does not attempt to recreate the table.
Related topics
-
For a full list of Flink-supported connectors, see Supported connectors.
-
To manage DLF catalogs, see Manage DLF-Legacy Catalogs.
-
For DLF endpoint details, see Endpoints.