All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Iceberg Catalogs

Last Updated:Dec 22, 2025

After you configure an Iceberg catalog, you can use Realtime Compute for Apache Flink to directly access Iceberg tables in Alibaba Cloud Data Lake Formation (DLF). This topic describes how to create, view, and delete Iceberg catalogs, and how to manage Iceberg databases and tables in the development console.

Notes

  • Only Ververica Runtime (VVR) 11.1 and later support creating and configuring Iceberg catalogs and Iceberg tables.

  • Only DLF catalogs are supported.

Create an Iceberg DLF Catalog

  1. Create a catalog in DLF. For more information, see Quick start with DLF.

    1. The DLF catalog must be in the same region as the Flink workspace. Otherwise, you cannot associate them in the subsequent steps.

  2. Create an Iceberg catalog in the development console of Realtime Compute for Apache Flink.

    Note
    • This operation creates a mapping to your DLF catalog. Creating or deleting the catalog in Flink does not affect actual data in DLF.

    • All tables created in the DLF catalog via Iceberg REST are Iceberg tables.

    1. Log on to Realtime Compute for Apache Flink's Management Console.

    2. In the Actions column of your workspace, click Console.

    3. In the left navigation menu, click Development > Scripts.

    4. Create a new script. In the SQL editor, copy and paste the following SQL statement. In the lower-right corner, click Environment, select a session cluster of VVR 11.2.0+, and run the SQL statement to register a DLF catalog via Iceberg REST.

      CREATE CATALOG `catalog_name` 
       WITH (
          'type' = 'iceberg',
          'catalog-type' = 'rest',
          'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg',
          'warehouse' = 'iceberg_test',
          'rest.signing-region' = 'cn-hangzhou',
          'io-impl' = 'org.apache.iceberg.rest.DlfFileIO'
      );

      The following table describes the options.

      Option

      Description

      Required

      Example

      type

      The type. Set this to iceberg.

      Yes

      iceberg

      catalog-type

      The catalog type. Set this to rest.

      Yes

      rest

      token.provider

      The token provider. Set this to dlf.

      Yes

      dlf

      uri

      The URI used to access the DLF catalog via Iceberg REST. For more information, see Iceberg REST.

      Yes

      http://ap-southeast-1-vpc.dlf.aliyuncs.com/iceberg

      warehouse

      The name of your DLF catalog.

      Yes

      iceberg_test

      rest.signing-region

      The region ID of DLF. For more information, see Endpoints.

      Yes

      ap-southeast-1

      io-impl

      Set this to org.apache.iceberg.rest.DlfFileIO.

      Yes

      org.apache.iceberg.rest.DlfFileIO

Manage Iceberg databases

On the Data Query page, enter the following commands in the text editor, select the code, and click Run.

  • Create a database

    After you create an Iceberg catalog, a database named default is automatically created in the catalog.

    -- Replace my-catalog with the name of your Catalog.
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with a custom database name.
    CREATE DATABASE `my_db`;
  • Delete a database

    Important

    You cannot delete the default database from a DLF catalog.

    -- Replace my-catalog with the name of your Catalog.
    USE CATALOG `my-catalog`;
    
    -- Replace my_db with the name of the database that you want to delete.
    DROP DATABASE `my_db`; -- Deletes a database only if it contains no tables.
    DROP DATABASE `my_db` CASCADE; -- Deletes a database and all tables within it.
    

Manage Iceberg tables

Create a table

Note

After you configure an Iceberg catalog, you can reference its tables in a job. When you use an Iceberg table as a source, sink, or dimension table, you do not need to declare its Data Definition Language (DDL). You can reference a table using its full name: ${Iceberg-catalog-name}.${Iceberg-db-name}.${Iceberg-table-name}. Alternatively, you can first run the use catalog ${Iceberg-catalog-name} and use ${Iceberg-db-name} statements. Then, you can reference the table using only its name: ${Iceberg-table-name}.

  • Create a table using a CREATE TABLE statement

    On the Data Query page, enter the following command into the text editor, select it, and then click Run.

    The following example shows how to create a partitioned table in the `my_db` database of the `my-catalog` catalog. The partition key is `dt`, and the primary key consists of `dt`, `shop_id`, and `user_id`.

    -- Replace my-catalog with the name of your Iceberg Catalog.
    -- Replace my_db with the name of the database that you want to use.
    -- You can also replace my_tbl with a custom table name.
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt)
    ;

Modify the table schema

On the Data Query page, you can enter the following commands in the text editor, select the code, and then click Run.

Operation

Code example

Temporarily modify table parameters

You can temporarily modify table parameters when writing to a table by adding an SQL hint after the table name. The temporary parameters are effective only for the current SQL job.

  • Temporarily set upsert-enabled to true when writing data to the my_table table.

    INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
    SELECT ...;
  • Temporarily set monitor-interval to 10s when consuming data from the my_table table.

    SELECT * FROM my_table /*+ OPTIONS('monitor-interval'='10s') */

Rename a table

Rename the my_table table to my_table_new.

ALTER TABLE my_table RENAME TO my_table_new;

Add new columns

  • Add a column named c1 of the INT type and a column named c2 of the STRING type to the end of the my_table table.

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);

Rename a column

Rename the c0 column in the my_table table to c1.

ALTER TABLE my_table RENAME c0 TO c1;

Delete columns

Delete the c1 and c2 columns from the my_table table.

ALTER TABLE my_table DROP (c1, c2);

Modify a column comment

Change the comment of the buy_count column in the my_table table to 'this is buy count'.

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

Modify the column order

  • Move the col_a column of the DOUBLE type to the beginning of the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • Move the col_a column of the DOUBLE type after the col_b column in the my_table table.

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

Delete a table

On the Data Query page, enter the following command in the text editor, select it, and then click Run.

-- Replace my-catalog with the name of your Iceberg Catalog.
-- Replace my_db with the name of the database that you want to use.
-- Replace my_tbl with the name of your Iceberg Catalog table.
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

If the message The following statement has been executed successfully! appears, the Iceberg table has been deleted.

View or delete an Iceberg Catalog

  1. On the Realtime Compute for Apache Flink console, click Console in the Actions column for the target workspace.

  2. On the Data Management page, you can view or delete the Iceberg Catalog.

    • On the Catalog List page, you can view the Catalog Name and Type. To view the databases and tables in a catalog, click View.

    • Delete: On the Catalog List page, find the catalog that you want to delete and click Delete in the Actions column.

      Note

      Deleting an Iceberg catalog only removes the record from Data Management in the Flink project. It does not affect the data files of the Iceberg tables. After you delete a catalog, you can reuse the Iceberg tables in it by creating the Iceberg catalog again.

      Alternatively, you can enter DROP CATALOG <catalog name>; in the text editor on the Data Query page, select the code, and then click Run.