This topic provides the DDL syntax that is used to create a full Apache Iceberg source table, describes the parameters in the WITH clause, and provides data type mappings and an example.
What is Apache Iceberg?
Apache Iceberg is an open data lake table format. You can use Apache Iceberg to quickly build your
own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba
Cloud Object Storage Service (OSS). Then, you can use a compute engine of the open
source big data ecosystem such as Apache Flink, Apache Spark, Apache Hive, or Apache
Presto to analyze data in your data lake. Apache Iceberg provides the following core
capabilities:
- Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
- Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
- Supports historical version backtracking.
- Supports efficient data filtering.
- Supports schema evolution.
- Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink
to import a large amount of behavioral data in logs into an Apache Iceberg data lake
in real time. Then, you can use Flink or another analytics engine to extract the value
of your data.
Note The Apache Iceberg connector can be used to store data of the result tables for Flink
streaming jobs. The Apache Iceberg connector can also be used to store data of the
source tables and result tables for Flink batch jobs. This topic describes how to
use an Apache Iceberg table as the source table for Flink batch jobs.
Limits
- Only the Flink compute engine of vvr-4.0.8-flink-1.13 or later supports the Apache Iceberg connector.
- The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.
DDL syntax
CREATE TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-type' = 'custom',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '<yourAccessKeyId>',
'access.key.secret' = '<yourAccessKeySecret>',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the source table. | Yes | Set the value to iceberg .
|
catalog-name | The name of the catalog. | Yes | Set the value to a custom name. |
catalog-type | The type of the catalog. | Yes | Set the value to custom. |
catalog-database | The name of the database. | Yes | Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db. |
io-impl | The name of the implementation class in the distributed file system. | Yes | Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO .
|
oss.endpoint | The endpoint of your OSS bucket. | No | For more information, see Regions and endpoints.
Note
|
access.key.id | The AccessKey ID of your Alibaba Cloud account. | Yes | For more information about how to obtain the AccessKey ID, see Obtain an AccessKey pair. |
access.key.secret | The AccessKey secret of your Alibaba Cloud account. | Yes | For more information about how to obtain the AccessKey secret, see Obtain an AccessKey pair. |
catalog-impl | The class name of the catalog. | Yes | Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog .
|
warehouse | The OSS directory in which table data is stored. | Yes | N/A. |
dlf.catalog-id | The ID of your Alibaba Cloud account that is used to access DLF. | Yes | To obtain the ID of your Alibaba Cloud account, go to the Security Settings page. ![]() |
dlf.endpoint | The endpoint of the DLF service. | Yes |
Note
|
dlf.region-id | The region name of the DLF service. | Yes |
Note Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.
|
Data type mappings
Data type of Apache Iceberg | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME
Note Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are
accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time
precision is aligned to milliseconds.
|
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K, V> | MAP |
Example
The following example describes how to use a Flink job to import data from an Apache Iceberg source table to an Apache Iceberg result table. This process implements basic extract, transform, load (ETL) operations.