すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon カタログの管理

最終更新日:Jan 22, 2026

Data Lake Formation (DLF) でデータカタログを作成した後、Flink がそのメタデータにアクセスできるように Flink に登録する必要があります。このトピックでは、Paimon カタログを Flink 環境にリンクして管理する方法について説明します。

事前準備

  • バージョン要件

    • VVR 8.0.5 以降:Paimon カタログとテーブルの作成に必要です。

    • VVR 11.1 以降:DLF をメタストアとして使用するために必要です。

  • ストレージ要件

    Paimon はデータとメタデータを Object Storage Service (OSS) に保存します。ご利用の OSS バケットを次のように設定します。

    • ストレージクラス:標準ストレージクラスを使用します。詳細については、「ストレージクラス」をご参照ください。

    • バケットの設定:Flink ワークスペースと同じリージョンに専用のバケットを作成します。データの競合を避けるため、デフォルトのシステムバケットを再利用しないでください。

  • アクセス権限
    AccessKey ペアに、ご利用の OSS バケットまたは DLF ディレクトリに対する読み取りおよび書き込み権限があることを確認してください。

  • フォーマットの互換性

    Paimon カタログは Paimon フォーマットのデータのみをサポートします。Lance や Iceberg などの他のデータレイクフォーマットはサポートされていません。

  • コンソールの更新
    カタログ、データベース、またはテーブルを作成または削除した後、更新 image ボタンをクリックしてコンソールビューを更新します。

  • Paimon バージョンのマッピング

    Apache Paimon バージョン

    VVR バージョン

    1.3

    11.4

    1.2

    11.2、11.3

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7, 8.0.8, 8.0.9, and 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

カタログの作成

Paimon カタログは、さまざまなメタストアタイプを使用してメタデータを管理します。シナリオに最も適したタイプを選択してください。

  • ファイルシステムカタログ:メタデータを OSS に保存します。単純なデプロイメントに最適です。

  • DLF カタログ:メタデータを DLF に保存します。統合されたメタデータ管理を必要とするエンタープライズシナリオに最適です。

ファイルシステムカタログの作成

方法 1:コンソールを使用 (推奨)

  1. [カタログ] ページに移動します。

    1. 管理ポータルにログインします。対象のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

    2. 左側のナビゲーションウィンドウで、[カタログ] をクリックします。

  2. [カタログの作成] をクリックし、[Apache Paimon] を選択して、[次へ] をクリックします。

  3. metastorefilesystem に設定し、他のカタログ設定を構成します。

方法 2:SQL を使用

[開発] > [スクリプト] の SQL エディターで次のコマンドを実行します。

CREATE CATALOG `my-catalog` WITH (
  'type' = 'paimon',
  'metastore' = 'filesystem',
  'warehouse' = '<warehouse>',
  'fs.oss.endpoint' = '<fs.oss.endpoint>',
  'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
  'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
);
  • 一般パラメーター

    設定項目

    説明

    必須

    備考

    my-catalog

    Paimon カタログの名前。

    はい

    カスタム名を入力します。

    type

    カタログのタイプ。

    はい

    paimon に設定します。

    metastore

    メタストアのタイプ。

    はい

    有効な値:

    • filesystem:メタデータを OSS に保存します。

    • dlf:メタデータを DLF に保存します。

  • OSS パラメーター

    設定項目

    説明

    必須

    備考

    warehouse

    OSS 内のデータウェアハウスディレクトリ。

    はい

    フォーマットは oss://<bucket>/<object> です。各項目は次のとおりです。

    • bucket:ご利用の OSS バケットの名前。

    • object:データが保存されているパス。

    OSS コンソールでバケットとオブジェクト名を表示します。

    fs.oss.endpoint

    OSS サービスのエンドポイント。

    はい

    • Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、パブリックネットワークエンドポイントを使用します。

    • warehouse に指定された OSS バケットが Flink ワークスペースと同じリージョンにない場合、または別の Alibaba Cloud アカウントに属する OSS バケットを使用する場合は、このパラメーターが必要です。

    詳細については、「リージョンとエンドポイント」および「AccessKey の作成」をご参照ください。

    fs.oss.accessKeyId

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

    はい

    fs.oss.accessKeySecret

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    はい

DLF カタログの作成

DLF

  1. DLF コンソールに移動し、Flink ワークスペースと同じリージョンにカタログを作成します。詳細については、「DLF のクイックスタート」をご参照ください。

  2. 開発コンソールで Paimon カタログを登録します。

    説明

    この操作は DLF カタログへのマッピングを確立するだけです。Flink でこのカタログマッピングを作成または削除しても、DLF 内の実際のデータには影響しません。

    1. Realtime Compute for Apache Flink の管理ポータルにログインします。

    2. 対象のワークスペースの名前をクリックします。

    3. UI または SQL コマンドを使用してカタログを登録します。

      UI

      1. 左側のナビゲーションウィンドウで、[カタログ] をクリックします。

      2. [カタログ] ページで、[カタログの作成] をクリックします。

      3. [Apache Paimon] を選択し、[次へ] をクリックします。

      4. [metastore][DLF] に設定し、[カタログ名] フィールドで関連付ける DLF カタログを選択して、[OK] をクリックします。カタログが Flink に登録されます。

      SQL

      [開発] > [スクリプト] の SQL エディターで、次の SQL ステートメントを実行してカタログを登録します。

      CREATE CATALOG `flink_catalog_name` 
      WITH (
        'type' = 'paimon',
        'metastore' = 'rest',
        'token.provider' = 'dlf',
        'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
        'warehouse' = 'dlf_test'
      );

      パラメーター:

      パラメーター

      説明

      必須

      type

      カタログのタイプ。paimon に設定します。

      はい

      paimon

      metastore

      メタストアのタイプ。値を rest に設定します。

      はい

      rest

      token.provider

      トークンプロバイダー。値を dlf に設定します。

      はい

      dlf

      uri

      DLF REST Catalog Server へのアクセスに使用される URI。フォーマットは http://[region-id]-vpc.dlf.aliyuncs.com です。リージョン ID の詳細については、「エンドポイント」をご参照ください。

      はい

      http://cn-hangzhou-vpc.dlf.aliyuncs.com

      warehouse

      DLF カタログの名前。

      はい

      dlf_test

DLF-Legacy

  1. DLF コンソールに移動し、Flink ワークスペースと同じリージョンにカタログを作成します。詳細については、「スタートガイド」をご参照ください。

  2. 開発コンソールで Paimon カタログを登録します。

    方法 1:コンソールを使用 (推奨)

    1. [カタログ] ページに移動します。

      1. 管理ポータルにログインし、対象のワークスペースの [操作] 列にある [コンソール] をクリックします。

      2. [カタログ] をクリックします。

    2. [カタログの作成] をクリックします。[Apache Paimon] を選択し、[次へ] をクリックします。

    3. [metastore][DLF] に設定し、[カタログ名] ドロップダウンリストから関連付ける V1.0 DLF カタログを選択します。その後、他のカタログ設定を構成します。

    方法 2:SQL を使用

    [開発] > [スクリプト] の SQL エディターで次のコマンドを実行します。

    CREATE CATALOG `my-catalog` WITH (
      'type' = 'paimon',
      'metastore' = 'dlf',
      'warehouse' = '<warehouse>',
      'dlf.catalog.id' = '<dlf.catalog.id>',
      'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
      'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
      'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
      'dlf.catalog.region' = '<dlf.catalog.region>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    次の表にパラメーターを示します。

    • 一般

      設定項目

      説明

      必須

      備考

      my-catalog

      Paimon カタログの名前。

      はい

      カスタム名を入力します。

      type

      カタログのタイプ。

      はい

      値は paimon に固定されます。

      metastore

      メタストアのタイプ。

      はい

      値は dlf に固定されます。

    • OSS

      設定項目

      説明

      必須

      備考

      warehouse

      OSS 内のデータウェアハウスディレクトリ。

      はい

      フォーマットは oss://<bucket>/<object> です。各項目は次のとおりです。

      • bucket:ご利用の OSS バケットの名前。

      • object:データが保存されているパス。

      OSS コンソールでバケットとオブジェクト名を表示します。

      fs.oss.endpoint

      OSS サービスのエンドポイント。

      はい

      • Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、パブリックネットワークエンドポイントを使用します。

      • Paimon テーブルを OSS-HDFS に保存する場合は、fs.oss.endpoint パラメーターの値を cn-<region>.oss-dls.aliyuncs.com に設定します。例:cn-hangzhou.oss-dls.aliyuncs.com

      fs.oss.accessKeyId

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      はい

      詳細については、「リージョンとエンドポイント」および「AccessKey ペアの作成」をご参照ください。

      fs.oss.accessKeySecret

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

      はい

    • DLF

      設定項目

      説明

      必須

      備考

      dlf.catalog.id

      DLF データカタログの ID。

      はい

      Data Lake Formation コンソールでデータカタログの ID を表示します。

      dlf.catalog.accessKeyId

      DLF サービスへのアクセスに必要な AccessKey ID。

      はい

      詳細については、「AccessKey ペアの作成」をご参照ください。

      dlf.catalog.accessKeySecret

      DLF サービスへのアクセスに必要な AccessKey Secret。

      はい

      詳細については、「AccessKey ペアの作成」をご参照ください。

      dlf.catalog.endpoint

      DLF サービスのエンドポイント。

      はい

      詳細については、「リージョンとエンドポイント」をご参照ください。

      説明

      Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、パブリックネットワークエンドポイントを使用します。

      dlf.catalog.region

      DLF が配置されているリージョン。

      はい

      詳細については、「リージョンとエンドポイント」をご参照ください。

      説明

      リージョンが dlf.catalog.endpoint に指定されたものと同じであることを確認してください。

データベースの管理

[開発] > [スクリプト] の SQL エディターで次のコマンドを実行します。

  • データベースの作成

    Paimon カタログを登録すると、default という名前のデータベースがカタログに自動的に作成されます。追加のデータベースを作成するには、次の SQL ステートメントを実行します。

    -- my-catalog をご利用の Paimon カタログの名前に置き換えます。
    USE CATALOG `my-catalog`;
    
    -- my_db を英語のカスタムデータベース名に置き換えます。
    CREATE DATABASE `my_db`;
  • データベースの削除

    重要

    default データベースは DLF カタログからは削除できません。ただし、ファイルシステムカタログからは削除できます。

    -- my-catalog をご利用の Paimon カタログの名前に置き換えます。
    USE CATALOG `my-catalog`;
    
    -- my_db を削除したいデータベースの名前に置き換えます。
    DROP DATABASE `my_db`; -- テーブルが含まれていない場合にのみデータベースを削除します。
    DROP DATABASE `my_db` CASCADE; -- データベースとそれに含まれるすべてのテーブルを削除します。
    

テーブルの管理

テーブルの作成

説明

Paimon カタログを登録した後、DDL 定義を作成することなく、Flink ジョブでそのテーブルを直接参照できます。テーブルを参照するには:

  • 完全修飾名を使用する:${Paimon-catalog-name}.${Paimon-db-name}.${Paimon-table-name}

  • または、USE CATALOG ${Paimon-catalog-name}USE ${Paimon-db-name} を使用して現在のカタログとデータベースを設定します。その後、テーブル名のみを使用してテーブルを参照します:${Paimon-table-name}

  • CREATE TABLE ステートメントを使用したテーブルの作成

    [開発] > [スクリプト] の SQL エディターで、以下のコマンドを実行します。

    次の例では、my-catalogmy_db データベースにパーティションテーブルを作成します。

    -- my-catalog をご利用の Paimon カタログの名前に置き換えます。
    -- my_db を使用したいデータベースの名前に置き換えます。
    -- my_tbl を英語のカスタム名に置き換えることもできます。
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      shop_id BIGINT,
      user_id BIGINT,
      num_orders INT,
      total_amount INT,
      PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
    ) PARTITIONED BY (dt) WITH (
      'bucket' = '4'
    );

    詳細については、「Apache Paimon コネクタ」および「プライマリキーテーブルと追加専用テーブル」をご参照ください。

  • CREATE TABLE AS (CTAS) または CREATE DATABASE AS (CDAS) ステートメントを使用したテーブルの作成

    CTAS および CDAS ステートメントは、データとテーブルスキーマの変更を自動的に同期します。これらのステートメントを使用して、MySQL や Kafka などのソースから Paimon にテーブルを簡単にレプリケーションします。

    説明
    • 特定のテーブルのレプリケーション

      次の例では、mysql.tpcds.web_sales テーブルのスキーマに基づいて Paimon テーブル `my-catalog`.`my_db`.`web_sales` を自動的に作成し、データをレプリケーションします。

      CREATE TABLE IF NOT EXISTS `<catalog name>`.`<db name>`.`<table name>`
      WITH (
        'bucket' = '4',
        'changelog-producer' = 'input'
      ) AS TABLE mysql.tpcds.web_sales;
    • データベース全体のレプリケーション

      次の例では、mysql.tpcds データベースの各テーブルのスキーマに基づいて、`my-catalog`.`my_db` データベースに Paimon テーブルを自動的に作成し、データをレプリケーションします。

      CREATE DATABASE IF NOT EXISTS `<catalog name>`.`<db name>`
      WITH (
        'changelog-producer' = 'input'
      ) AS DATABASE mysql.tpcds INCLUDING ALL TABLES;
    • 列の型の変更のレプリケーション

      CTAS または CDAS ステートメントを使用して作成された Paimon テーブルは、特定の列の型の変更をサポートします。

      • デフォルトの型レプリケーション

        デフォルトでは、Paimon テーブルはソースの正確な列の型を保持します。ただし、明示的な設定なしで特定の型昇格がサポートされています。

        • 整数型:TINYINTSMALLINTINT、および BIGINT は、同等以上の精度の整数型に昇格できます (TINYINT < SMALLINT < INT < BIGINT)。

        • 浮動小数点型:FLOAT および DOUBLE は、同等以上の精度の浮動小数点型に昇格できます (FLOAT < DOUBLE)。

        • 文字列型:CHARVARCHAR、および STRING は、同等以上の精度の文字列型に昇格できます。

      • 型の正規化

        より柔軟な型の変更を有効にし、互換性のあるスキーマ進化によるジョブの失敗を防ぐには、CTAS/CDAS ステートメントの WITH 句で 'enableTypeNormalization' = 'true' を設定します。このモードでは、Flink はスキーマ進化中にデータ型を正規化します。ジョブは、古い型と新しい型が同じ基になる型に正規化できない場合にのみ失敗します。ルール:

        • TINYINTSMALLINTINTBIGINTBIGINT に正規化されます。

        • FLOATDOUBLEDOUBLE に正規化されます。

        • CHARVARCHARSTRINGSTRING に正規化されます。

        • 他のデータ型は正規化されません。

      • 正規化の影響:

        型の正規化がアクティブな場合、Paimon テーブルに保存されるデータ型は正規化された型になります。たとえば、ソース列 c1 (SMALLINT) と c2 (INT) が両方とも BIGINT に正規化される場合、c1c2 の両方が Paimon テーブルに BIGINT として保存されます。

      • 例:

        • 成功:ソースの SMALLINTINT に変更された場合、両方の型は BIGINT に正規化されます。これは互換性のある変更であり、ジョブは正常に続行されます。

        • 失敗:ソースの FLOATBIGINT に変更された場合、それらの正規化された型 (DOUBLEBIGINT) は互換性がないため、例外が発生します。

テーブルスキーマの変更

[開発] > [スクリプト] の SQL エディターで、次のコマンドを実行します。

操作

サンプルコード

テーブルプロパティの追加または変更

ALTER TABLE my_table SET (
  'write-buffer-size' = '256 MB',
  'write-buffer-spillable' = 'true'
);

テーブルプロパティの一時的な変更

一時的に変更されたテーブルプロパティは、現在の SQL ジョブに対してのみ有効です。

  • my_table テーブルに書き込む際に、write-buffer-size を一時的に 256 MB に、write-buffer-spillable を true に設定します。

    INSERT INTO my_table /*+ OPTIONS('write-buffer-size' = '256 MB', 'write-buffer-spillable' = 'true') */
    SELECT ...;
  • my_table テーブルからデータを消費する際に、scan.mode を一時的に latest に、scan.parallelism を 10 に設定します。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest', 'scan.parallelism' = '10') */;

テーブル名の変更

ALTER TABLE my_table RENAME TO my_table_new;
重要

OSS での名前変更操作はアトミックではありません。したがって、OSS を使用して Apache Paimon テーブルファイルを保存する場合、Apache Paimon テーブルの名前を変更する際には注意が必要です。OSS-HDFS を使用してファイル操作の原子性を確保することを推奨します。

新しい列の追加

  • my_table テーブルの末尾に、INT 型の c1 列と STRING 型の c2 列を追加します。

    ALTER TABLE my_table ADD (c1 INT, c2 STRING);
  • my_table テーブルの c1 列の後に STRING 型の c2 列を追加します。

    ALTER TABLE my_table ADD c2 STRING AFTER c1;
  • my_table テーブルの先頭に INT 型の c1 列を追加します。

    ALTER TABLE my_table ADD c1 INT FIRST;

列名の変更

my_table の c0 列を c1 に名前変更します。

ALTER TABLE my_table RENAME c0 TO c1;

列の削除

my_table から c1c2 列を削除します。

ALTER TABLE my_table DROP (c1, c2);

パーティションの削除

my_table から dt=20240108,hh=06dt=20240109,hh=07 パーティションを削除します。

ALTER TABLE my_table DROP PARTITION (`dt` = '20240108', `hh` = '08'), PARTITION (`dt` = '20240109', `hh` = '07');

列コメントの変更

my_tablebuy_count 列のコメントを this is buy count に変更します。

ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'this is buy count';

列の順序の変更

  • my_table の先頭に DOUBLE 型の col_a を移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
  • my_tablecol_b の後に DOUBLE 型の col_a を移動します。

    ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;

列の型の変更

my_tablecol_a の型を DOUBLE に変更します。

ALTER TABLE my_table MODIFY col_a DOUBLE;

次の表に、Paimon テーブルでサポートされている列の型の変更を示します。図では、〇は型変換がサポートされていることを示します。空のセルは、型変換がサポートされていないことを示します。

image

テーブルの削除

[開発] > [スクリプト] の SQL エディターで次のコマンドを実行します。

-- my-catalog をご利用の Paimon カタログの名前に置き換えます。
-- my_db を使用したいデータベースの名前に置き換えます。
-- my_tbl を作成した Paimon カタログテーブルの名前に置き換えます。
DROP TABLE `my-catalog`.`my_db`.`my_tbl`;

The following statement has been executed successfully! というメッセージが返された場合、Paimon テーブルは削除されています。

カタログの表示または削除

  1. 管理ポータルで、対象のワークスペースの [操作] 列にある [コンソール] をクリックします。

  2. [カタログ] ページで、Apache Paimon カタログを表示または削除します。

    • [カタログ] ページで、カタログ名とタイプを表示します。カタログ内のデータベースとテーブルを表示するには、[操作] 列の [表示] をクリックします。

    • [カタログ] ページで、削除するカタログの [操作] 列にある [削除] をクリックします。

      説明

      Flink から Paimon カタログを削除すると、Flink 環境からその定義 (メタデータ) のみが削除されます。基になる Paimon テーブルのデータファイルは影響を受けません。その後、Flink でカタログを再登録して、それらの Paimon テーブルに再度アクセスできます。

      または、[開発] > [スクリプト] の SQL エディターで DROP CATALOG <catalog name>; と入力し、コードを選択して [実行] をクリックします。

関連ドキュメント