All Products
Search
Document Center

Realtime Compute for Apache Flink:Apache Iceberg connector

Last Updated:Sep 08, 2023

This topic describes how to use the Apache Iceberg connector.

Background information

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a computing engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake.

Item

Description

Table type

Source table and result table

Running mode

Batch mode and streaming mode

Data format

N/A

Metric

N/A

API type

SQL API

Data update or deletion in a result table

Supported

Features

Apache Iceberg provides the following core capabilities:

  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.

  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.

  • Supports historical version backtracking.

  • Supports efficient data filtering.

  • Supports schema evolution.

  • Supports partition evolution.

Note

You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Apache Iceberg connector.

  • The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.

Syntax

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  ...
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the source table.

    STRING

    Yes

    No default value

    Set the value to iceberg.

    catalog-name

    The name of the catalog.

    STRING

    Yes

    No default value

    Set the value to a custom name.

    catalog-database

    The name of the database.

    STRING

    Yes

    default

    Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.

    Note

    If you have not created a DLF database, create one.

    io-impl

    The name of the implementation class in the distributed file system.

    STRING

    Yes

    No default value

    Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.

    oss.endpoint

    The endpoint of your OSS bucket.

    STRING

    No

    No default value

    For more information, see Regions and endpoints.

    Note
    • We recommend that you set the oss.endpoint parameter to the virtual private cloud (VPC) endpoint of the OSS bucket. For example, if you select the China (Hangzhou) region, set oss.endpoint to oss-cn-hangzhou-internal.aliyuncs.com.

    • If you want to access OSS across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?

    access.key.id

    The AccessKey ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    access.key.secret

    The AccessKey secret of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    catalog-impl

    The class name of the catalog.

    STRING

    Yes

    No default value

    Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.

    warehouse

    The OSS directory in which table data is stored.

    STRING

    Yes

    No default value

    N/A.

    dlf.catalog-id

    The ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    You can go to the Security Settings page to obtain the account ID.

    dlf.endpoint

    The endpoint of DLF

    STRING

    Yes

    No default value

    Note
    • We recommend that you set dlf.endpoint to the VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

    • If you want to access DLF across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?

    dlf.region-id

    The name of the region in which the DLF service is activated.

    STRING

    Yes

    No default value

    Note

    Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    write.operation

    The write operation mode.

    STRING

    No

    upsert

    • upsert: Data is updated. This is the default value.

    • insert: Data is written to the table in append mode.

    • bulk_insert: A specific amount of data is written at a time and existing data is not updated.

    hive_sync.enable

    Specifies whether to enable the synchronization of metadata to Hive.

    BOOLEAN

    No

    false

    Valid values:

    • true: The synchronization of metadata to Hive is enabled.

    • false: The synchronization of metadata to Hive is disabled. This is the default value.

    hive_sync.mode

    The Hive data synchronization mode.

    STRING

    No

    hms

    • hms: If you use a Hive metastore or DLF catalog, retain the default value. This is the default value.

    • jdbc: If the Java Database Connectivity (JDBC) catalog is used, set this value to jdbc.

    hive_sync.db

    The name of the Hive database to which data is synchronized.

    STRING

    No

    Database name of the current table in the catalog

    N/A.

    hive_sync.table

    The name of the Hive table to which data is synchronized.

    STRING

    No

    Name of the current table

    N/A.

    dlf.catalog.region

    The name of the region in which the DLF service is activated.

    STRING

    No

    No default value

    Note
    • The dlf.catalog.region parameter takes effect only when the hive_sync.mode parameter is set to hms.

    • Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter.

    dlf.catalog.endpoint

    The endpoint of DLF.

    STRING

    No

    No default value

    Note
    • The dlf.catalog.endpoint parameter takes effect only when the hive_sync.mode parameter is set to hms.

    • We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

    • If you want to access DLF across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?

Data type mappings

Data type of Apache Iceberg

Data type of Flink

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

Note

Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

Sample code

Confirm that a DLF database is created. If you have not created a DLF database, create one.

Note

When you select an OSS directory for your DLF database, make sure that the directory is in the ${warehouse}/${database_name}.db format. For example, if warehouse is oss://iceberg-test/warehouse and database_name is dlf_db, the OSS directory of the dlf_db database is oss://iceberg-test/warehouse/dlf_db.db.

Sample code for an Apache Iceberg result table

This example shows how to use the Datagen connector to randomly generate streaming data and write the data to an Apache Iceberg result table.

  1. Create an OSS bucket. For more information, see Create buckets.

  2. On the SQL Editor page in the console of fully managed Flink, write code for an SQL streaming deployment in the text editor of the deployment that you want to edit.

    CREATE TEMPORARY TABLE datagen(
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE dlf_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    INSERT INTO dlf_iceberg SELECT * FROM datagen;

Sample code for an Apache Iceberg source table

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;