本記事では、Flink に基づいて Paimon Catalog を作成してデータを生成し、MaxCompute が Filesystem Catalog に基づいて外部プロジェクトを作成することで、Paimon テーブルのデータを直接読み取る方法について説明します。
制限事項
-
Paimon フォーマットのテーブルのみがサポートされます。
-
動的バケットテーブルへの書き込みはサポートされていません。
-
クロスパーティションテーブルへの書き込みはサポートされていません。
-
詳細については、「データ型マッピング」をご参照ください。
操作手順
ステップ 1:ソースデータの準備
Object Storage Service (OSS) に Paimon テーブルデータが既にある場合は、このステップをスキップできます。
-
OSS コンソールにログインし、バケットを作成します。この例では、バケット名は
paimon-fsです。詳細については、「バケットの作成」をご参照ください。バケット配下にpaimon-testディレクトリを新規作成します。 -
Flink コンソールにログインし、左上の隅でリージョンを選択します。
-
ターゲットワークスペース名をクリックし、左側のナビゲーションウィンドウで[カタログ]を選択します。
-
右側の[カタログリスト] 画面で、[カタログの作成] をクリックします。表示された[カタログの作成] ダイアログボックスで、 Apache Paimonを選択し、[次へ] をクリックして以下のパラメーターを設定します。
パラメーター
必須かどうか
説明
metastore
必須
メタストアのタイプ。この例では
filesystemを選択します。catalog name
必須
カタログ名をカスタマイズします。例:
paimon-fs-catalog。warehouse
必須
OSS で指定されたデータウェアハウスのディレクトリ。この例では
oss://paimon-fs/paimon-test/です。fs.oss.endpoint
必須
OSS のエンドポイント。例えば、杭州リージョンの場合は
oss-cn-hangzhou-internal.aliyuncs.comです。fs.oss.accessKeyId
必須
OSS へのアクセスに必要な Access Key ID。
fs.oss.accessKeySecret
必須
OSS へのアクセスに必要な Access Key Secret。
-
Paimon Catalog に基づいて Paimon テーブルを作成し、データを書き込みます。
-
左側のナビゲーションペインで、を選択します。
-
[クエリスクリプト] タブで、
をクリックし、新しいクエリスクリプトを作成します。次のコードを実行します。実際の命名規則に従って、コード内の関連する名前を変更してください。
説明Flink は Paimon Catalog を使用するため、デフォルトでファイルシステム上の Paimon Catalog の編成形式、つまり
paimon_catalog_name/database_name.db/xxxxに従います。他のエンジンを使用してこの OSS ディレクトリ内の Paimon データを読み書きする場合も、Paimon Catalog のファイルシステム上の編成形式に従う必要があります。また、Paimon フォーマットのデータのみを保存してください。そうしないと、異常データとして認識され、エラーが報告されます。CREATE TABLE `paimon-fs-catalog`.`default`.test_tbl ( id BIGINT, data STRING, dt STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'bucket' = '3' ); INSERT INTO `paimon-fs-catalog`.`default`.test_tbl VALUES (1,'CCC','2024-07-18'), (2,'DDD','2024-07-18'),(3,'EEE','2025-06-18');
-
-
生成されたファイルを確認します。OSS コンソールにログインし、Paimon Catalog にバインドされている OSS ディレクトリで生成された Paimon テーブルを確認します。
OSS ディレクトリ
/paimon-test/default.db/test_tbl/には、パーティションディレクトリdt=2024-07-18/、dt=2025-06-18/、およびメタデータディレクトリmanifest/、schema/、snapshot/があります。
ステップ 2:MaxCompute で外部データソースを作成
-
MaxCompute コンソールにログインし、左上の隅でリージョンを選択します。
-
左側のナビゲーションウィンドウで、 を選択します。
-
外部データソース ページで、外部データソースの作成 をクリックします。
-
外部データソースの追加 ダイアログボックスで、パラメーターを設定します。次の表でパラメーターを説明します。
パラメーター
必須かどうか
説明
外部データソースタイプ
必須
Filesystem Catalogを選択します。外部データソース名
必須
名前をカスタマイズできます。命名規則は次のとおりです。
-
アルファベットで始まり、小文字のアルファベット、アンダースコア、数字のみを含めることができます。
-
128 文字を超えることはできません。
例:
external_fs。外部データソースの説明
任意
必要に応じて入力します。
地理
必須
デフォルトで現在のリージョンが設定されます。
Authentication and Authorization
必須
デフォルトで Alibaba Cloud Resource Access Management (RAM) ロールが設定されます。
RoleARN
必須
RAM ロールの ARN 情報。このロールには、DLF と OSS の両方のサービスにアクセスできる権限が必要です。
-
RAM コンソールにログインします。
-
左側のナビゲーションバーで、を選択します。
-
基本情報 セクションで、ARN を取得できます。
例:
acs:ram::124****:role/aliyunodpsdefaultrole。Storage Type
-
OSS
-
OSS-HDFS
エンドポイント
自動的に生成されます。杭州リージョンの場合は
oss-cn-hangzhou-internal.aliyuncs.comです。外部データソースの追加プロパティ
任意
特別に宣言された外部データソースの補足プロパティ。指定すると、この外部データソースを使用するタスクは、パラメーターで定義された動作に従ってソースシステムにアクセスできます。
説明サポートされる具体的なパラメーターについては、今後の公式サイトのドキュメント更新にご注意ください。具体的なパラメーターは、製品機能の進化に伴い段階的に公開されます。
-
-
外部データソースを作成するには、確認 をクリックします。
-
外部データソース ページで、データソースを見つけ、操作 列の 詳細 をクリックしてその詳細を表示します。
ステップ 3:MaxCompute で外部プロジェクトを作成
-
MaxCompute コンソールにログインし、左上の隅でリージョンを選択します。
-
左側のナビゲーションウィンドウで、を選択します。
-
External Project タブで、新しいプロジェクト をクリックします。
-
新しいアイテム ダイアログボックスで、設定を行い、確認 をクリックします。
パラメーター
必須かどうか
説明
プロジェクトタイプ
必須
デフォルトで外部プロジェクトが設定されます。
地理
必須
デフォルトで現在のリージョンが設定されます。ここでは変更できません。
プロジェクト名 (ネットワーク全体で一意)
必須
アルファベットで始まり、アルファベット、数字、アンダースコア (_) を含み、長さは 3~28 文字です。
MaxCompute外部データソース型
任意
Filesystem Catalog を選択します。
MaxCompute外部データソース
任意
-
既存の選択:作成済みの外部データソースが表示されます。
-
新しい外部データソース:新しい外部データソースを新規作成して使用できます。
MaxCompute外部データソース名
必須
-
既存のものを選択:ドロップダウンリストから作成済みの外部データソース名を選択します。
-
新しい外部データソースを作成:新規作成した外部データソース名が使用されます。
Authentication and Authorization
必須
タスク実行者の ID。サービス関連ロールが作成されていない場合は、このモードを使用する前に作成する必要があります。
RoleARN
必須
RAM ロールの ARN 情報。このロールには、DLF と OSS の両方のサービスにアクセスできる権限が必要です。
-
RAM コンソールにログインします。
-
左側のナビゲーションバーで、 を選択します。
-
基本情報 セクションで、ARN を取得できます。
例:
acs:ram::124****:role/aliyunodpsdefaultrole。Storage Type
-
OSS
-
OSS-HDFS
エンドポイント
必須
デフォルトで生成されます。
Bucket Catalog
必須
完全な OSS バケットと Catalog レベルまでのファイルシステムディレクトリを選択します。この例では
oss://paimon-fs/paimon-test/です。テーブル形式
必須
デフォルトで Paimon が設定されます。
コンピューティングリソース支払いタイプ
必須
年と月またはボリュームで支払う。
デフォルトクォータ
必須
既存のクォータを選択します。
説明
任意
プロジェクトの説明をカスタマイズします。
-
ステップ 4:Paimon テーブルの読み書き
-
接続ツールを選択して外部プロジェクトにログインします。
-
新しく作成した外部プロジェクトに入り、既存の Paimon スキーマを確認します。
-- セッションレベルでスキーマ構文のサポートを有効にします。 SET odps.namespace.schema=true; SHOW schemas; -- 次の結果が返されます。 ID = 20250922********wbh2u7 default OK -
default スキーマ配下のテーブルを読み取ります。
SET odps.sql.allow.fullscan=true; SELECT * FROM <external_project_name>.default.test_tbl; -- 次の結果が返されます。 +------------+------------+------------+ | id | data | dt | +------------+------------+------------+ | 1 | CCC | 2024-07-18 | | 2 | DDD | 2024-07-18 | | 3 | EEE | 2025-06-18 | +------------+------------+------------+ -
既存の Paimon テーブルにデータを書き込みます。
INSERT INTO test_tbl PARTITION(dt='2025-08-26') VALUES(4,'FFF'); SELECT * FROM test_tbl; -- 次の結果が返されます。 +------------+------------+------------+ | id | data | dt | +------------+------------+------------+ | 1 | CCC | 2024-07-18 | | 2 | DDD | 2024-07-18 | | 3 | EEE | 2025-06-18 | | 4 | FFF | 2025-08-26 | +------------+------------+------------+ -
新しいスキーマにテーブルを作成し、データを書き込みます。
新しいテーブルを作成してデータを書き込むと、MaxCompute は Paimon Catalog のファイルシステム上の編成形式に従って新しいデータを書き込みます。
-- スキーマを作成します。 CREATE schema testschema; -- 新しいスキーマにテーブルを作成します。 use schema testschema; CREATE TABLE table_test(id INT, name STRING); -- 新しいテーブルにデータを挿入して読み取ります。 INSERT INTO table_test VALUES (101,'张三'),(102,'李四'); SELECT * FROM table_test; -- 結果は次のとおりです。 +------------+------------+ | id | name | +------------+------------+ | 101 | 张三 | | 102 | 李四 | +------------+------------+ -
OSS コンソールにログインし、OSS で外部プロジェクトのバケットディレクトリを見つけると、新しく作成されたスキーマとテーブルが表示されます。