All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Iceberg catalogs

Last Updated:Mar 26, 2026

Use Realtime Compute for Apache Flink to access Iceberg tables in Alibaba Cloud Data Lake Formation (DLF) by registering a DLF catalog in the Flink development console. After the catalog is set up, you can manage Iceberg databases and tables directly using SQL.

Prerequisites

Before you begin, ensure that you have:

  • A Flink workspace running Ververica Runtime (VVR) 11.1 or later

  • A DLF catalog in the same region as your Flink workspace. See Quick start with DLF

Limitations

Feature Supported Notes
Create Iceberg catalog Yes VVR 11.1+ required; only DLF catalogs are supported
Create database Yes A default database is created automatically; the default database cannot be deleted
Create table Yes
Alter table schema Yes Rename, add/drop columns, modify column order and comment
Drop table Yes
Drop catalog Yes Only removes the catalog record in Flink; does not affect data in DLF

Create an Iceberg DLF catalog

Creating an Iceberg catalog in Flink registers a mapping to your existing DLF catalog. It does not create or modify data in DLF. All tables created in the DLF catalog via Iceberg REST are Iceberg tables.

  1. Log in to the Realtime Compute for Apache Flink console.

  2. In the Actions column for your workspace, click Console.

  3. In the left navigation menu, go to Development > Scripts and create a new script.

  4. In the SQL editor, paste the following statement. In the lower-right corner, click Environment and select a session cluster running VVR 11.2.0 or later. Then run the statement.

    CREATE CATALOG `catalog_name`
    WITH (
        'type'              = 'iceberg',
        'catalog-type'      = 'rest',
        'token.provider'    = 'dlf',
        'uri'               = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg',
        'warehouse'         = 'iceberg_test',
        'rest.signing-region' = 'cn-hangzhou',
        'io-impl'           = 'org.apache.iceberg.rest.DlfFileIO'
    );

    Replace the values for uri, warehouse, and rest.signing-region with your own. See Iceberg REST endpoint for the list of available URIs by region. The following table describes each option.

    Option Required Description Example
    type Yes Set to iceberg. iceberg
    catalog-type Yes Set to rest. rest
    token.provider Yes Set to dlf. Configures DLF-based authentication. dlf
    uri Yes The Iceberg REST endpoint for your DLF catalog. The URI varies by region. See Iceberg REST endpoint. http://ap-southeast-1-vpc.dlf.aliyuncs.com/iceberg
    warehouse Yes The name of your DLF catalog. iceberg_test
    rest.signing-region Yes The region ID where DLF is deployed. cn-hangzhou
    io-impl Yes Set to org.apache.iceberg.rest.DlfFileIO. org.apache.iceberg.rest.DlfFileIO

View or delete a catalog

  1. On the Realtime Compute for Apache Flink console, click Console in the Actions column for your workspace.

  2. Go to the Data Management page and open Catalog List.

    • To view databases and tables in a catalog, click View next to the catalog name.

    • To delete a catalog, click Delete in the Actions column.

      Important

      Deleting a catalog only removes its record from Data Management in your Flink project. The underlying Iceberg data files in DLF are not affected. To reuse the tables, register the catalog again.

    Alternatively, run the following statement on the Data Query page:

    DROP CATALOG <catalog_name>;

Manage Iceberg databases

Run the following statements on the Data Query page: select the code and click Run.

Create a database

When you create an Iceberg catalog, a database named default is created automatically. To add another database:

-- Replace my-catalog with your catalog name.
USE CATALOG `my-catalog`;

-- Replace my_db with the database name you want to create.
CREATE DATABASE `my_db`;

Delete a database

Important

The default database in a DLF catalog cannot be deleted.

-- Replace my-catalog with your catalog name.
USE CATALOG `my-catalog`;

-- Delete a database only if it is empty.
DROP DATABASE `my_db`;

-- Delete a database and all tables inside it.
DROP DATABASE `my_db` CASCADE;

Manage Iceberg tables

Once an Iceberg catalog is registered, reference its tables in a job without declaring Data Definition Language (DDL). Use the full three-part name <catalog>.<database>.<table>, or first run USE CATALOG and USE <database>, then reference the table by name alone.

Create a table

On the Data Query page, select the code and click Run.

The following example creates a partitioned table in the my_db database of the my-catalog catalog. The partition key is dt, and the primary key is the combination of dt, shop_id, and user_id.

-- Replace my-catalog with your Iceberg catalog name.
-- Replace my_db with your database name.
-- Replace my_tbl with your table name.
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
  dt           STRING,
  shop_id      BIGINT,
  user_id      BIGINT,
  num_orders   INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);

Modify the table schema

Run the following statements on the Data Query page. All changes are persistent unless noted otherwise.

Rename a table

ALTER TABLE my_table RENAME TO my_table_new;

Add columns

-- Add c1 (INT) and c2 (STRING) to the end of the table.
ALTER TABLE my_table ADD (c1 INT, c2 STRING);

Rename a column

-- Rename column c0 to c1.
ALTER TABLE my_table RENAME c0 TO c1;

Delete columns

-- Delete columns c1 and c2.
ALTER TABLE my_table DROP (c1, c2);

Modify a column comment

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

Reorder columns

-- Move col_a to the beginning of the table.
ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;

-- Move col_a immediately after col_b.
ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

Apply temporary parameters with SQL hints

Use a SQL hint to override table parameters for a single job without modifying the table definition permanently.

-- Temporarily enable upsert mode when writing to a table.
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
SELECT ...;

-- Temporarily set the source read interval to 10 seconds.
SELECT * FROM my_table /*+ OPTIONS('monitor-interval'='10s') */;

Delete a table

On the Data Query page, select the code and click Run.

-- Replace my-catalog, my_db, and my_tbl with your values.
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

If the message The following statement has been executed successfully! appears, the Iceberg table has been deleted.