すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Iceberg カタログの管理

最終更新日:Dec 23, 2025

Iceberg カタログを設定すると、Realtime Compute for Apache Flink を使用して Alibaba Cloud Data Lake Formation (DLF) の Iceberg テーブルに直接アクセスできます。このトピックでは、Iceberg カタログの作成、表示、削除方法、および開発コンソールで Iceberg データベースとテーブルを管理する方法について説明します。

注意事項

  • Ververica Runtime (VVR) 11.1 以降のバージョンのみが、Iceberg カタログと Iceberg テーブルの作成と設定をサポートします。

  • DLF カタログのみがサポートされています。

Iceberg DLF カタログの作成

  1. DLF でカタログを作成します。詳細については、「DLF クイックスタート」をご参照ください。

    1. DLF カタログは Flink ワークスペースと同じリージョンにある必要があります。そうでない場合、後続のステップでそれらを関連付けることはできません。

  2. Realtime Compute for Apache Flink の開発コンソールで Iceberg カタログを作成します。

    説明
    • この操作は、ご利用の DLF カタログへのマッピングを作成します。Flink でカタログを作成または削除しても、DLF 内の実際のデータには影響しません。

    • Iceberg REST 経由で DLF カタログに作成されたすべてのテーブルはIceberg テーブルです。

    1. Realtime Compute for Apache Flink の管理コンソールにログインします。

    2. ご利用のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションメニューで、[開発] > [スクリプト] をクリックします。

    4. 新しいスクリプトを作成します。SQL エディターで、次の SQL 文をコピーして貼り付けます。右下隅にある [環境] をクリックし、VVR 11.2.0 以降のセッションクラスターを選択し、SQL 文を実行して Iceberg REST 経由で DLF カタログを登録します。

      CREATE CATALOG `catalog_name` 
       WITH (
          'type' = 'iceberg',
          'catalog-type' = 'rest',
          'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com/iceberg',
          'warehouse' = 'iceberg_test',
          'rest.signing-region' = 'cn-hangzhou',
          'io-impl' = 'org.apache.iceberg.rest.DlfFileIO'
      );

      次の表にオプションを説明します。

      オプション

      説明

      必須

      type

      タイプ。これを iceberg に設定します。

      はい

      iceberg

      catalog-type

      カタログタイプ。これを rest に設定します。

      はい

      rest

      token.provider

      トークンプロバイダー。これを dlf に設定します。

      はい

      dlf

      uri

      Iceberg REST 経由で DLF カタログにアクセスするために使用される URI。詳細については、「Iceberg REST」をご参照ください。

      はい

      http://ap-southeast-1-vpc.dlf.aliyuncs.com/iceberg

      warehouse

      ご利用の DLF カタログの名前。

      はい

      iceberg_test

      rest.signing-region

      DLF のリージョン ID。詳細については、「エンドポイント」をご参照ください。

      はい

      ap-southeast-1

      io-impl

      これを org.apache.iceberg.rest.DlfFileIO に設定します。

      はい

      org.apache.iceberg.rest.DlfFileIO

Iceberg データベースの管理

[データクエリ] ページで、テキストエディターに次のコマンドを入力し、コードを選択して [実行] をクリックします。

  • データベースの作成

    Iceberg カタログを作成すると、default という名前のデータベースがカタログ内に自動的に作成されます。

    -- my-catalog をご利用のカタログ名に置き換えます。
    USE CATALOG `my-catalog`;
    
    -- my_db をカスタムデータベース名に置き換えます。
    CREATE DATABASE `my_db`;
  • データベースの削除

    重要

    DLF カタログからデフォルトのデータベースを削除することはできません。

    -- my-catalog をご利用のカタログ名に置き換えます。
    USE CATALOG `my-catalog`;
    
    -- my_db を削除したいデータベースの名前に置き換えます。
    DROP DATABASE `my_db`; -- テーブルが含まれていない場合にのみデータベースを削除します。
    DROP DATABASE `my_db` CASCADE; -- データベースとその中のすべてのテーブルを削除します。
    

Iceberg テーブルの管理

テーブルの作成

説明

Iceberg カタログを設定した後、ジョブでそのテーブルを参照できます。Iceberg テーブルをソース、シンク、またはディメンションテーブルとして使用する場合、そのデータ定義言語 (DDL) を宣言する必要はありません。テーブルは、そのフルネーム ${Iceberg-catalog-name}.${Iceberg-db-name}.${Iceberg-table-name} を使用して参照できます。または、最初に use catalog ${Iceberg-catalog-name} 文と use ${Iceberg-db-name} 文を実行することもできます。その後、テーブル名のみ ${Iceberg-table-name} を使用してテーブルを参照できます。

  • CREATE TABLE 文を使用したテーブルの作成

    [データクエリ] ページで、テキストエディターに次のコマンドを入力し、それを選択してから [実行] をクリックします。

    次の例は、`my-catalog` カタログの `my_db` データベースにパーティションテーブルを作成する方法を示しています。パーティションキーは `dt` で、プライマリキーは `dt`、`shop_id`、`user_id` で構成されます。

    -- my-catalog をご利用の Iceberg カタログの名前に置き換えます。
    -- my_db を使用したいデータベースの名前に置き換えます。
    -- my_tbl をカスタムテーブル名に置き換えることもできます。
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt)
    ;

テーブルスキーマの変更

[データクエリ] ページで、テキストエディターに次のコマンドを入力し、コードを選択してから [実行] をクリックします。

操作

コード例

テーブルパラメーターの一時的な変更

テーブルに書き込むときに、テーブル名の後に SQL ヒントを追加することで、テーブルパラメーターを一時的に変更できます。一時的なパラメーターは、現在の SQL ジョブに対してのみ有効です。

  • my_table テーブルにデータを書き込む際に、upsert-enabled を一時的に true に設定します。

    INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
    SELECT ...;
  • my_table テーブルからデータを読み取る際に、monitor-interval を一時的に 10s に設定します。

    SELECT * FROM my_table /*+ OPTIONS('monitor-interval'='10s') */

テーブル名の変更

my_table テーブルの名前を my_table_new に変更します。

ALTER TABLE my_table RENAME TO my_table_new;

新しい列の追加

  • my_table テーブルの末尾に、INT 型の c1 という名前の列と STRING 型の c2 という名前の列を追加します。

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);

列名の変更

my_table テーブルの c0 列の名前を c1 に変更します。

ALTER TABLE my_table RENAME c0 TO c1;

列の削除

my_table テーブルから c1 列と c2 列を削除します。

ALTER TABLE my_table DROP (c1, c2);

列コメントの変更

my_table テーブルの buy_count 列のコメントを 'this is buy count' に変更します。

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

列の順序の変更

  • my_table テーブルの DOUBLE 型の col_a 列を先頭に移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • my_table テーブルの DOUBLE 型の col_a 列を col_b 列の後に移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

テーブルの削除

[データクエリ] ページで、テキストエディターに次のコマンドを入力し、それを選択してから [実行] をクリックします。

-- my-catalog をご利用の Iceberg カタログの名前に置き換えます。
-- my_db を使用したいデータベースの名前に置き換えます。
-- my_tbl をご利用の Iceberg カタログテーブルの名前に置き換えます。
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

The following statement has been executed successfully! というメッセージが表示された場合、Iceberg テーブルは削除されています。

Iceberg カタログの表示または削除

  1. Realtime Compute for Apache Flink コンソールで、対象のワークスペースの [操作] 列にある [コンソール] をクリックします。

  2. [データ管理] ページで、Iceberg カタログを表示または削除できます。

    • [カタログリスト] ページでは、[カタログ名][タイプ] を表示できます。カタログ内のデータベースとテーブルを表示するには、[表示] をクリックします。

    • 削除: [カタログリスト] ページで、削除したいカタログを見つけ、[操作] 列の [削除] をクリックします。

      説明

      Iceberg カタログを削除すると、Flink プロジェクトのデータ管理からレコードが削除されるだけです。Iceberg テーブルのデータファイルには影響しません。カタログを削除した後、Iceberg カタログを再度作成することで、その中の Iceberg テーブルを再利用できます。

      または、[データクエリ] ページのテキストエディターに DROP CATALOG <catalog name>; と入力し、コードを選択して [実行] をクリックすることもできます。