This topic describes how to use the Apache Iceberg connector.
Background information
Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a compute engine of the open source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto to analyze data in your data lake.
Category | Details |
Supported type | Source table and sink table |
Running mode | Batch mode and streaming mode |
Data format | N/A |
Metric | None |
API | SQL |
Data update or deletion in a sink table | Yes |
Features
Apache Iceberg provides the following core capabilities:
Builds a low-cost lightweight data lake storage service based on HDFS or object storage.
Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
Supports historical version backtracking.
Supports efficient data filtering.
Supports Schema Evolution.
Supports partition evolution.
Supports using with a self-managed Hive metastore. For more information, see Use the Hive catalog with a self-managed Hive Metastore (HMS).
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Apache Iceberg connector. The Apache Iceberg connector must be used with the DLF catalog. For more information, see Manage the DLF catalog.
The Apache Iceberg connector supports the Apache Iceberg table format of version 1 and version 2. For more information, see Iceberg Table Spec.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports the Apache Iceberg table format of version 2.
If the streaming read mode is enabled, only Apache Iceberg tables in which data is written in Append Only mode can be used as source tables.
Syntax
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
Connector options in the with clause
Common options for source tables
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the source table. | String | Yes | None | Set the value to |
catalog-name | The catalog name | String | Yes | None | The custom name. |
catalog-database | The database name | String | Yes | default | Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db. Note If you have not created a DLF database, create one. |
io-impl | The name of the implementation class in the distributed file system | String | Yes | None | Set the value to |
oss.endpoint | The endpoint of your OSS bucket | String | No | None | For more information, see Regions and endpoints. Note
|
| The AccessKey ID of your Alibaba Cloud account | String | Yes | None | For more information, see How do I view the AccessKey ID and AccessKey Secret? Important To prevent the leakage of your AccessKey information, we recommend that you use variables to specify the AccessKey values. For more information, see Project variables. |
| The AccessKey Secret of your Alibaba Cloud account | String | Yes | None | |
catalog-impl | The class name of the catalog | String | Yes | None | Set the value to |
warehouse | The OSS directory in which table data is stored | String | Yes | None | None. |
dlf.catalog-id | The ID of your Alibaba Cloud account | String | Yes | None | You can obtain the account ID from the Account Management page. |
dlf.endpoint | The endpoint of DLF | String | Yes | None | . Note
|
dlf.region-id | The name of the region in which the DLF service is activated | String | Yes | None | . Note Make sure that the region that you specify for the dlf.endpoint parameter is the same as the region that you specify for this parameter. |
uri | The thrift URI of the Hive metastore | String | Required only when you use the Hive catalog. | None | Used with a self-managed Hive Metastore. |
Options for sink tables
Parameter | Description | Data type | Required | Default value | Remarks |
write.operation | The write operation mode | String | No | upsert |
|
hive_sync.enable | Specifies whether to enable the synchronization of metadata to Hive | boolean | No | false | Valid values:
|
hive_sync.mode | The Hive data synchronization mode | String | No | hms |
|
hive_sync.db | The name of the Hive database to which data is synchronized | String | No | Database name of the current table in the catalog | None. |
hive_sync.table | The name of the Hive table to which data is synchronized | String | No | Name of the current table | None. |
dlf.catalog.region | The name of the region in which the DLF service is activated | String | No | None | . Note
|
dlf.catalog.endpoint | The endpoint of DLF | String | No | None | . Note
|
Data type mappings
Apache Iceberg data type | Flink data type |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds. |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
Examples
Confirm that an OSS bucket and a DLF database are created. For more information, see Create buckets and Databases, tables, and functions.
When Selecting A Path for creating a DLF database, you are recommended to follow the format ${warehouse}/${database_name}.db. For example, if the warehouse address is oss://iceberg-test/warehouse and the database name is dlf_db, the OSS path for dlf_db needs to be set to oss://iceberg-test/warehouse/dlf_db.db.
Sample code for a sink table
This example shows how to use the Datagen connector to randomly generate streaming data and write the data to an Apache Iceberg result table.
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '',
'catalog-database' = '',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '',
'dlf.catalog-id' = '',
'dlf.endpoint' = '',
'dlf.region-id' = ''
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
Sample code for a source table
Use the Hive catalog with a self-managed Hive Metastore (HMS).
You must ensure that Flink and the HMS cluster can communicate with each other. The data will be stored in the
oss://///flink_table
directory.CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='', 'catalog-database'='', 'uri'='thrift://:', 'warehouse'='oss:///', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='', 'access-key-secret'='', 'oss.endpoint'='' );
Use the DLF catalog to write data from an Apache Iceberg source table to an Apache Iceberg sink table.
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '', 'catalog-database' = '', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '', 'dlf.catalog-id' = '', 'dlf.endpoint' = '', 'dlf.region-id' = '' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '', 'catalog-database' = '', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '', 'dlf.catalog-id' = '', 'dlf.endpoint' = '', 'dlf.region-id' = '' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
References
For more information, see Supported connectors.