すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon カタログの管理

最終更新日:Nov 09, 2025

Paimon カタログを設定すると、Realtime Compute for Apache Flink を使用して Alibaba Cloud Data Lake Formation (DLF) の Paimon テーブルに直接アクセスできます。このトピックでは、Paimon カタログの作成、表示、削除方法、および Realtime Compute 開発コンソールで Paimon データベースとテーブルを管理する方法について説明します。

  • Ververica Runtime (VVR) 8.0.5 以降のみが Paimon カタログとテーブルの作成および設定をサポートします。VVR 11.1.0 以降のみがメタストアタイプを DLF に設定することをサポートします。

  • Object Storage Service (OSS) は、データファイルやメタデータファイルなど、Paimon テーブルに関連するファイルを格納します。OSS がアクティブ化されており、OSS バケットのストレージタイプが標準であることを確認してください。詳細については、「コンソールの使用開始」および「ストレージタイプ」をご参照ください。

    重要

    Realtime Compute for Apache Flink をアクティブ化したときに指定した OSS バケットを使用することもできます。ただし、データをより適切に区別し、偶発的なデータ操作を防ぐために、同じリージョンに別の OSS バケットを作成して使用することをお勧めします。

  • Paimon カタログの作成に使用される AccessKey は、OSS バケットと DLF ディレクトリに対する読み取りおよび書き込み権限を持っている必要があります。

  • SQL を使用してカタログ、データベース、またはテーブルを作成または削除した後、更新 image ボタンをクリックして [メタデータ] ページを更新します。

  • 次の表に、Paimon と VVR バージョン間のマッピングを示します。

    Apache Paimon バージョン

    VVR バージョン

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7、8.0.8、8.0.9、および 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

Paimon カタログの作成

Paimon Filesystem カタログの作成

UI ベースのメソッド

  1. Data Management ページに移動します。

    1. Real-time Compute コンソールにログインし、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

    2. [Data Management] をクリックします。

  2. [カタログの作成] をクリックし、[Apache Paimon] を選択して、[次へ] をクリックします。

  3. パラメーターを設定します。

SQL ベースのメソッド

[データクエリ] エディターで、次のコマンドを入力します。

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'filesystem',
  'warehouse' = '<warehouse>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);

次の表にパラメーターを示します。

  • 全般

    パラメーター

    説明

    必須

    my-catalog

    Paimon カタログの名前。

    はい

    カスタムの英語名を入力します。

    type

    カタログのタイプ。

    はい

    値は paimon に固定されています。

    metastore

    メタストアのタイプ。

    はい

    有効な値:

    • filesystem: Paimon Filesystem カタログを設定する場合にこの値を指定します。

    • dlf: Paimon DLF カタログを設定する場合にこの値を指定します。

  • OSS

    パラメーター

    説明

    必須

    warehouse

    OSS のウェアハウスディレクトリ。

    はい

    フォーマットは oss://<bucket>/<object> です。

    • bucket: 作成した OSS バケットの名前。

    • object: データが格納されているパス。

    OSS コンソールでバケットとオブジェクト名を表示します。

    fs.oss.endpoint

    OSS サービスのエンドポイント。

    はい

    • Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、インターネットエンドポイントを使用します。

    • このパラメーターは、warehouse に指定された OSS バケットと Flink ワークスペースが異なるリージョンにある場合、または別の Alibaba Cloud アカウントの OSS バケットを使用する場合に必須です。

    詳細については、「リージョンとエンドポイント」および「AccessKey の作成」をご参照ください。

    fs.oss.accessKeyId

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

    はい

    fs.oss.accessKeySecret

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    はい

Paimon DLF カタログの作成

DLF

  1. DLF 上に Apache Paimon カタログを作成します。詳細については、「DLF の使用」をご参照ください。

    1. DLF カタログは Flink ワークスペースと同じリージョンにある必要があります。そうでない場合、関連付けは失敗します。

  2. Realtime Compute 開発コンソールで Paimon カタログを作成します。

    説明

    この操作は DLF カタログへのマッピングを作成します。Flink でカタログを作成または削除しても、DLF の実際のデータには影響しません。

    1. Realtime Compute for Apache Flink 管理コンソールにログインします。

    2. ワークスペース名をクリックして開発コンソールを開きます。

    3. 次のいずれかの方法を使用してカタログを登録します。

      UI

      1. 左側のナビゲーションメニューで、[カタログ] をクリックします。

      2. [カタログリスト] ページで、[カタログの作成] をクリックします。

      3. [カタログの作成] ウィザードで、[Apache Paimon] を選択し、[次へ] をクリックします。

      4. [metastore][DLF] に設定します。[カタログ名] には、接続する DLF カタログを選択します。

      5. [確認] をクリックします。

      SQL コマンド

      [スクリプト] SQL エディターで、次の SQL コードをコピーして実行し、Flink に DLF カタログを登録します。

      CREATE CATALOG `flink_catalog_name` 
      WITH (
        'type' = 'paimon',
        'metastore' = 'rest',
        'token.provider' = 'dlf',
        'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
        'warehouse' = 'dlf_test'
      );

      次の表にコネクタオプションを示します。

      オプション

      説明

      必須

      type

      カタログタイプ。このオプションを paimon に設定します。

      はい

      paimon

      metastore

      カタログメタストア。このオプションを rest に設定します。

      はい

      rest

      token.provider

      トークンプロバイダー。このオプションを dlf に設定します。

      はい

      dlf

      uri

      DLF カタログサービスの Rest URI。フォーマット: http://[region-id]-vpc.dlf.aliyuncs.com。「エンドポイント」のリージョン ID をご参照ください。

      はい

      http://ap-southeast-1-vpc.dlf.aliyuncs.com

      warehouse

      DLF paimon カタログの名前。

      はい

      dlf_test

DLF-Legacy

  1. DLF 上に Apache Paimon カタログを作成します。詳細については、「クイックスタート」をご参照ください。

    1. DLF カタログは Flink ワークスペースと同じリージョンにある必要があります。そうでない場合、関連付けは失敗します。

  2. Realtime Compute 開発コンソールで Paimon カタログを作成します。

    UI ベースのメソッド

    1. Data Management ページに移動します。

      1. Real-time Compute コンソールにログインし、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

      2. [Data Management] をクリックします。

    2. [カタログの作成] をクリックします。[Apache Paimon] を選択し、[次へ] をクリックします。

    3. [metastore] には、[DLF] を選択します。[カタログ名] には、関連付ける V1.0 DLF カタログを選択し、SQL メソッドと同じ必須パラメーターを設定します。

    SQL コマンド

    [データクエリ] エディターで、次のコマンドを入力します。

    CREATE CATALOG `my-catalog` WITH (
      'type' = 'paimon',
      'metastore' = 'dlf',
      'warehouse' = '<warehouse>',
      'dlf.catalog.id' = '<dlf.catalog.id>',
      'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
      'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
      'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
      'dlf.catalog.region' = '<dlf.catalog.region>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    次の表にパラメーターを示します。

    • 全般

      パラメーター

      説明

      必須

      my-catalog

      Paimon カタログの名前。

      はい

      カスタムの英語名を入力します。

      type

      カタログのタイプ。

      はい

      値は paimon に固定されています。

      metastore

      メタストアのタイプ。

      はい

      値は dlf に固定されています。

    • OSS

      パラメーター

      説明

      必須

      warehouse

      OSS のウェアハウスディレクトリ。

      はい

      フォーマットは oss://<bucket>/<object> です。

      • bucket: 作成した OSS バケットの名前。

      • object: データが格納されているパス。

      OSS コンソールでバケットとオブジェクト名を表示します。

      fs.oss.endpoint

      OSS サービスのエンドポイント。

      はい

      • Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、インターネットエンドポイントを使用します。

      • Paimon テーブルを OSS-HDFS に保存する場合は、fs.oss.endpoint の値を cn-<region>.oss-dls.aliyuncs.com (例: cn-hangzhou.oss-dls.aliyuncs.com) に設定します。

      fs.oss.accessKeyId

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      はい

      詳細については、「リージョンとエンドポイント」および「AccessKey の作成」をご参照ください。

      fs.oss.accessKeySecret

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

      はい

    • DLF

      パラメーター

      説明

      必須

      dlf.catalog.id

      DLF データカタログの ID。

      はい

      Data Lake Formation コンソールでデータカタログの ID を表示できます。

      dlf.catalog.accessKeyId

      DLF サービスへのアクセスに必要な AccessKey ID。

      はい

      詳細については、「AccessKey の作成」をご参照ください。

      dlf.catalog.accessKeySecret

      DLF サービスへのアクセスに必要な AccessKey Secret。

      はい

      詳細については、「AccessKey の作成」をご参照ください。

      dlf.catalog.endpoint

      DLF サービスのエンドポイント。

      はい

      詳細については、「利用可能なリージョンとエンドポイント」をご参照ください。

      説明

      Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、インターネットエンドポイントを使用します。

      dlf.catalog.region

      DLF が存在するリージョン。

      はい

      詳細については、「サポートされているリージョンとエンドポイント」をご参照ください。

      説明

      リージョンが dlf.catalog.endpoint に選択されたものと同じであることを確認してください。

Paimon データベースの管理

[データクエリ] エディターで次のコマンドを入力し、コードを選択して [実行] をクリックします。

  • データベースの作成

    Paimon カタログを作成すると、default という名前のデータベースがカタログに自動的に作成されます。

    --my-catalog を作成した Paimon カタログの名前に置き換えます。
    USE CATALOG `my-catalog`;
    
    --my_db をデータベースのカスタム英語名に置き換えます。
    CREATE DATABASE `my_db`;
  • データベースの削除

    重要

    DLF カタログからデフォルトのデータベースを削除することはできません。Filesystem カタログからデフォルトのデータベースを削除できます。

    --my-catalog を作成した Paimon カタログの名前に置き換えます。
    USE CATALOG `my-catalog`;
    
    --my_db を削除したいデータベースの名前に置き換えます。
    DROP DATABASE `my_db`; --テーブルを含まないデータベースのみを削除します。
    DROP DATABASE `my_db` CASCADE; --データベースとそれに含まれるすべてのテーブルを削除します。
    

Paimon テーブルの管理

テーブルの作成

説明

Paimon カタログを設定した後、ジョブで Paimon カタログテーブルを参照できます。ソーステーブル、結果テーブル、またはディメンションテーブルとして使用する場合、テーブルの DDL を宣言する必要はありません。SQL コマンドでは、${Paimon-catalog-name}.${Paimon-db-name}.${Paimon-table-name} 形式で Paimon カタログテーブルのフルネームを使用できます。また、use catalog ${Paimon-catalog-name} および use ${Paimon-db-name} 文を使用してカタログ名とデータベース名を宣言することもできます。その後、後続の SQL 文ではテーブル名 ${Paimon-table-name} のみを使用できます。

  • CREATE TABLE 文を使用してテーブルを作成する

    [データクエリ] テキストエディターで次のコマンドを入力し、コマンドを選択してから [実行] をクリックします。

    次のサンプルコードは、my-catalog カタログの my_db データベースにパーティションテーブルを作成する方法を示しています。パーティションキーは dt、プライマリキーは dt、shop_id、user_id で構成され、バケット数は 4 に固定されています。

    --my-catalog を作成した Paimon カタログの名前に置き換えます。
    --my_db を使用したいデータベースの名前に置き換えます。
    --my_tbl をカスタムの英語名に置き換えることもできます。
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt) WITH (
      'bucket' = '4'
    );

    Paimon テーブルのパラメーターと使用方法の詳細については、「ストリーミングデータレイクハウス Paimon」および「Paimon プライマリキーテーブルと追加専用テーブル」をご参照ください。

  • CREATE TABLE AS (CTAS) 文またはCREATE DATABASE AS (CDAS) 文を使用してテーブルを作成する

    CTAS および CDAS 文は、データを自動的に同期するだけでなく、テーブルスキーマの変更も同期します。CTAS および CDAS 文を使用して、MySQL や Kafka などのデータソースから Paimon カタログにテーブルを簡単に同期できます。

    CTAS および CDAS 文はデータ同期を必要とするため、ジョブを[デプロイ]して[開始]する必要があります。詳細については、「ジョブ開発マップ」および「ジョブの開始」をご参照ください。

    説明
    • 指定したテーブルを作成し、そのデータを同期する

      たとえば、mysql.tpcds.web_sales テーブルのスキーマに基づいて、`my-catalog`.`my_db`.`web_sales` という名前の Paimon テーブルが自動的に作成され、テーブル内のデータが同期されます。Paimon テーブルのバケット数は 4 に設定され、入力チェンジログプロデューサーが使用されます。

      CREATE TABLE IF NOT EXISTS `<catalog name>`.`<db name>`.`<table name>`
      WITH (
        'bucket' = '4',
        'changelog-producer' = 'input'
      ) AS TABLE mysql.tpcds.web_sales;
    • データベース全体のテーブルを作成する

      たとえば、mysql.tpcds データベースの各テーブルのスキーマに基づいて、対応する Paimon テーブルが `my-catalog`.`my_db` に自動的に作成され、テーブル内のデータが同期されます。Paimon テーブルには入力チェンジログプロデューサーが使用されます。

      CREATE DATABASE IF NOT EXISTS `<catalog name>`.`<db name>`
      WITH (
        'changelog-producer' = 'input'
      ) AS DATABASE mysql.tpcds INCLUDING ALL TABLES;
    • 列タイプの変更を同期する

      CTAS または CDAS 文を使用して作成された Paimon テーブルは、列の追加だけでなく、特定の列タイプの変更もサポートします。必要に応じて、緩やかなフィールドタイプモードを使用するかどうかを選択できます。

      • デフォルトの動作

        デフォルトでは、CTAS または CDAS 文を使用して作成された Paimon テーブルの列タイプは、ソーステーブルの列タイプと同じです。次の列タイプの変更がサポートされています。

        • 整数型 TINYINT、SMALLINT、INT、および BIGINT は、同じかそれ以上の精度の整数型に変更できます。TINYINT は最も精度が低く、BIGINT は最も精度が高いです。

        • 浮動小数点型 FLOAT および DOUBLE は、同じかそれ以上の精度の浮動小数点型に変更できます。FLOAT は精度が低く、DOUBLE は精度が高いです。

        • 文字列型 CHAR、VARCHAR、および STRING は、同じかそれ以上の精度の文字列型に変更できます。

      • 緩やかなフィールドタイプモードを使用する

        CTAS および CDAS シナリオでは、Paimon カタログは緩やかなフィールドタイプモードをサポートします。このモードを使用するには、WITH 句で 'enableTypeNormalization' = 'true' を設定します。データ型変更イベントがアップストリームで発生した場合、変更された型の正規化された型が元の型の正規化された型と同じであれば、変更は成功したと見なされます。現在の型正規化ルールは次のとおりです。

        • TINYINT、SMALLINT、INT、および BIGINT は BIGINT に正規化されます。

        • FLOAT および DOUBLE は DOUBLE に正規化されます。

        • CHAR、VARCHAR、および STRING は STRING に正規化されます。

        • 他のデータ型は正規化されません。

        例:

        • SMALLINT を INT に変更した場合、両方の正規化された型は BIGINT です。変更は成功したと見なされ、ジョブは通常どおり実行されます。

        • FLOAT を BIGINT に変更した場合、それらの正規化された型はそれぞれ DOUBLE と BIGINT です。これは互換性のない変更であり、例外がスローされます。

        Paimon テーブルに格納されるデータ型は、正規化された型に統一されます。たとえば、MySQL の 2 つの列が SMALLINT 型と INT 型である場合、両方とも Paimon テーブルでは BIGINT 型として格納されます。

テーブルスキーマの変更

[データクエリ] テキストエディターで、次のコマンドを入力して選択し、[実行] をクリックします。

操作

サンプルコード

テーブルパラメーターの追加または変更

テーブルの write-buffer-size パラメーターの値を 256 MB に、write-buffer-spillable パラメーターの値を true に変更します。

ALTER TABLE my_table SET (
  'write-buffer-size' = '256 MB',
  'write-buffer-spillable' = 'true'
);

テーブルパラメーターの一時的な変更

テーブル名の後に SQL ヒントを追加して、テーブルへの書き込み時にテーブルパラメーターを一時的に変更できます。一時的に変更されたパラメーターは、現在の SQL ジョブに対してのみ有効です。

  • my_table テーブルへの書き込み時に、write-buffer-size を一時的に 256 MB に、write-buffer-spillable を true に設定します。

    INSERT INTO my_table /*+ OPTIONS('write-buffer-size' = '256 MB', 'write-buffer-spillable' = 'true') */
    SELECT ...;
  • my_table テーブルからデータを消費するときに、scan.mode を一時的に latest に、scan.parallelism を 10 に設定します。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest', 'scan.parallelism' = '10') */;

テーブル名の変更

my_table テーブルを my_table_new に名前変更します。

ALTER TABLE my_table RENAME TO my_table_new;
重要

OSS での名前変更操作はアトミックではありません。したがって、OSS を使用して Apache Paimon テーブルファイルを格納する場合、Apache Paimon テーブルの名前を変更する際には注意が必要です。OSS-HDFS サービスを使用して、ファイル操作の原子性を確保することをお勧めします。

新しい列の追加

  • my_table テーブルの末尾に INT 型の c1 列と STRING 型の c2 列を追加します。

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);
  • my_table テーブルの c1 列の後に STRING 型の c2 列を追加します。

    ALTER TABLE my_table ADD c2 STRING AFTER c1;
  • my_table テーブルの先頭に INT 型の c1 列を追加します。

    ALTER TABLE my_table ADD c1 INT FIRST;

列名の変更

my_table テーブルの c0 列を c1 に名前変更します。

ALTER TABLE my_table RENAME c0 TO c1;

列の削除

my_table テーブルから c1 列と c2 列を削除します。

ALTER TABLE my_table DROP (c1, c2);

パーティションの削除

my_table テーブルから dt=20240108,hh=06dt=20240109,hh=07 パーティションを削除します。

ALTER TABLE my_table DROP PARTITION (`dt` = '20240108', `hh` = '08'), PARTITION (`dt` = '20240109', `hh` = '07');

列コメントの変更

my_table テーブルの buy_count 列のコメントを 'this is buy count' に変更します。

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

列の順序の変更

  • my_table テーブルの先頭に DOUBLE 型の col_a 列を移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • my_table テーブルの col_b 列の後に DOUBLE 型の col_a 列を移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

列タイプの変更

my_table テーブルの col_a 列の型を DOUBLE に変更します。

ALTER TABLE my_table MODIFY col_a DOUBLE;

次の図は、Paimon テーブルでサポートされている列タイプの変更を示しています。丸 (〇) は、型変換がサポートされていることを示します。空のセルは、型変換がサポートされていないことを示します。

image

テーブルの削除

[データクエリ] テキストエディターで次のコマンドを入力し、コマンドを選択して [実行] をクリックします。

--my-catalog を作成した Paimon カタログの名前に置き換えます。
--my_db を使用したいデータベースの名前に置き換えます。
--my_tbl を作成した Paimon カタログテーブルの名前に置き換えます。
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

メッセージ The following statement has been executed successfully! が返された場合、Paimon テーブルは削除されます。

Paimon カタログの表示または削除

  1. Real-time Compute コンソールで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

  2. [Data Management] ページで、Apache Paimon カタログを表示またはドロップできます。

    • [カタログリスト] ページでは、[カタログ名][タイプ] を表示できます。カタログ内のデータベースとテーブルを表示するには、[表示] をクリックします。

    • [カタログリスト] ページで、削除するカタログを見つけ、[アクション] 列の [削除] をクリックします。

      説明

      Paimon カタログを削除すると、Flink ワークスペースの Data Management ページからそのレコードのみが削除されます。この操作は Paimon テーブルのデータファイルには影響しません。Paimon カタログを再作成することで、カタログ内の Paimon テーブルを再利用できます。

      また、[データクエリ] エディターで DROP CATALOG <catalog name>; を入力し、コードを選択して [実行] をクリックすることもできます。

リファレンス

  • Paimon テーブルを作成した後、テーブルからデータを消費したり、テーブルにデータを書き込んだりできます。詳細については、「Paimon テーブルへのデータの書き込みと消費」をご参照ください。

  • 組み込みカタログがビジネス要件を満たさない場合は、カスタムカタログを使用できます。詳細については、「カスタムカタログの管理」をご参照ください。

  • さまざまなシナリオにおける Paimon プライマリキーテーブルと追加専用テーブルの一般的な最適化については、「Paimon パフォーマンスの最適化」をご参照ください。