すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:DLF 1.0 と OSS を使用したデータレイクからの Paimon データの読み取り

最終更新日:Mar 01, 2026

このトピックでは、Flink を使用して Paimon Data Lake Formation (DLF) カタログを作成し、MySQL のデータ変更キャプチャ (CDC) データを読み取り、データを Object Storage Service (OSS) に書き込み、メタデータを DLF に同期する方法について説明します。その後、MaxCompute の外部スキーマを使用して、データレイク内のデータに対してフェデレーテッドクエリを実行できます。

リージョン

  • サポート対象リージョン

    リージョン名

    リージョン ID

    中国 (杭州)

    cn-hangzhou

    中国 (上海)

    cn-shanghai

    中国 (北京)

    cn-beijing

    中国 (張家口)

    cn-zhangjiakou

    中国 (深セン)

    cn-shenzhen

    中国 (香港)

    cn-hongkong

    シンガポール

    ap-southeast-1

    ドイツ (フランクフルト)

    eu-central-1

  • MaxCompute、OSS、DLF、Flink は同じリージョンにデプロイする必要があります。

操作手順

前提条件

  1. OSS サービスを有効化済みであること。

  2. DLF サービスを有効化済みであること。

  3. Flink サービスを有効化済みであること。

  4. MaxCompute プロジェクトを作成し、プロジェクトレベルのメタデータに対するスキーマサポートを有効化済みであること。

  5. ApsaraDB RDS for MySQL インスタンスを作成済みであること。

ステップ 1:DLF と OSS にアクセスするための MaxCompute 権限の付与

アカウントは、権限付与なしでは MaxCompute プロジェクトから DLF および OSS サービスにアクセスできません。次の 2 つの方法のいずれかで必要な権限を付与できます:

  • ワンクリック権限付与:同じアカウントを使用して MaxCompute プロジェクトを作成し、DLF をデプロイする場合、DLF の権限付与 をクリックしてワンクリックで権限を付与できます。

  • カスタム権限付与:DLF のカスタム権限付与を使用できます。この方法は、異なるアカウントを使用して MaxCompute プロジェクトを作成し、DLF をデプロイする場合にも適用されます。

ステップ 2:MySQL テストデータの準備

すでに MySQL のテストデータがある場合は、このステップをスキップできます。

  1. RDS コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、インスタンス をクリックします。次に、左上隅でリージョンを選択します。

  3. インスタンスページで、目的のインスタンスの [インスタンス ID/名前] をクリックして、その詳細ページを開きます。

  4. 左側のナビゲーションウィンドウで、[データベース] をクリックします。

  5. データベースの作成 をクリックします。次のパラメーターを設定します:

    パラメーター

    必須

    説明

    データベース名

    必須

    • 名前は 2~64 文字である必要があります。

    • 先頭は英字、末尾は英字または数字である必要があります。

    • 小文字の英字、数字、アンダースコア (_)、ハイフン (-) を使用できます。

    • データベース名はインスタンス内で一意である必要があります。

    • データベース名に - が含まれている場合、作成されたデータベースのフォルダー名にある -@002d に変換されます。

    mysql_paimon

    サポートされる文字セット

    必須

    必要に応じて文字セットを選択します。

    utf8

    承認者

    任意

    • このデータベースにアクセスする必要があるアカウントを選択します。このパラメーターを空のままにして、データベース作成後にアカウントをアタッチすることもできます。

    • ここには標準アカウントのみが表示されます。特権アカウントはすべてのデータベースに対するすべての権限を持っているため、権限付与は不要です。

    Default

    説明

    任意

    管理を容易にするためのデータベースの説明。説明は最大 256 文字です。

    Flink テストデータベースを作成します。

  6. データベースにログイン をクリックします。左側のナビゲーションウィンドウで、データベースインスタンス を選択します。作成したデータベースをダブルクリックします。SQLConsole ページで、次のステートメントを実行してテストテーブルを作成し、テストデータを書き込みます。

    インスタンスは存在するが、インスタンスを展開しても対象のデータベースが表示されない場合、次のいずれかの理由が考えられます:

    • ログインアカウントが対象のデータベースにアクセスできない:RDS インスタンスの アカウント ページに移動して、アカウントの権限を変更するか、ログインデータベースアカウントを変更できます。

    • メタデータが同期されておらず、ディレクトリが表示されない:対象のデータベースを含むインスタンスにマウスポインターを合わせます。インスタンス名の右側にある image ボタンをクリックして、データベースリストをリフレッシュします。

    -- Create a table
    CREATE TABLE sales (
        id INT NOT NULL AUTO_INCREMENT,
        year INT NOT NULL,
        amount DECIMAL(10,2) NOT NULL,
        product_name VARCHAR(100) NOT NULL,
        customer_name VARCHAR(100) NOT NULL,
        order_date DATE NOT NULL,
        region VARCHAR(50) NOT NULL,
        status VARCHAR(20) NOT NULL,
        PRIMARY KEY (id,year)
    ) PARTITION BY RANGE (year) (
        PARTITION p2020 VALUES LESS THAN (2021),
        PARTITION p2021 VALUES LESS THAN (2022),
        PARTITION p2022 VALUES LESS THAN (2023),
        PARTITION p2023 VALUES LESS THAN (2024)
    );
    
    -- Write data
    INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES
    (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'),
    (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'),
    (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'),
    (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'),
    (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'),
    (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'),
    (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'),
    (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'),
    (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'),
    (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');
  7. テストテーブルのデータをクエリします。

    SELECT * FROM sales;

    次の結果が返されます:

    image

ステップ 3:DLF メタデータベースの準備

  1. OSS コンソールにログインし、バケットを作成します。この例では、バケット名は mc-lakehouse-dlf-oss です。詳細については、「バケットの作成」をご参照ください。

  2. バケット内に flink_paimon という名前のフォルダーを作成します。

  3. Data Lake Formation (DLF) コンソールにログインし、左上隅でリージョンを選択します。

  4. 左側のナビゲーションウィンドウで、Metadata > Metadata を選択します。

  5. Metadata ページで、Database タブをクリックします。

  6. [デフォルト]Catalog List で、Create Database をクリックします。次のパラメーターを設定します:

    パラメーター

    必須

    説明

    Catalog

    必須

    この例では、データカタログは [デフォルト] です。

    Database Name:

    必須

    カスタムのデータベース名。名前は 1~128 文字で、先頭は英字、使用できるのは英字、数字、アンダースコア (_) のみです。例:db_dlf_oss

    Database Description:

    任意

    カスタムの説明。

    Select Path:

    必須

    データベースのストレージ場所。例:oss://mc-lakehouse-dlf-oss/flink_paimon/

ステップ 4:Flink を使用した Paimon および MySQL カタログの作成

  1. Paimon カタログの作成

    1. Flink コンソールにログインし、左上隅でリージョンを選択します。

    2. 対象のワークスペースの名前をクリックします。左側のナビゲーションウィンドウで、カタログ を選択します。

    3. カタログリスト ページで、右側の カタログの作成 をクリックします。カタログの作成 ダイアログボックスで、[Apache Paimon] を選択し、次へ をクリックして、次のパラメーターを設定します:

      パラメーター

      必須

      説明

      metastore

      必須

      メタストアのタイプ。この例では、dlf を選択します。

      catalog name

      必須

      関連付ける DLF カタログのバージョンを選択します。この例では、v1.0 を選択します。

      warehouse

      必須

      OSS で指定されたデータウェアハウスのディレクトリ。この例では、ディレクトリは oss://mc-lakehouse-dlf-oss/flink_paimon/ です。

      fs.oss.endpoint

      必須

      OSS サービスのエンドポイント。たとえば、中国 (杭州) リージョンのエンドポイントは oss-cn-hangzhou-internal.aliyuncs.com です。

      fs.oss.accessKeyId

      必須

      OSS サービスへのアクセスに使用される AccessKey ID。

      fs.oss.accessKeySecret

      必須

      OSS サービスへのアクセスに使用される AccessKey Secret。

      dlf.catalog.accessKeyId

      必須

      DLF サービスへのアクセスに使用される AccessKey ID。

      dlf.catalog.accessKeySecret

      必須

      DLF サービスへのアクセスに使用される AccessKey Secret。

  2. MySQL カタログの作成

    1. Flink コンソールにログインし、左上隅でリージョンを選択します。

    2. ホワイトリストに IP アドレスを追加します。

      1. 対象ワークスペースの 操作 列で、詳細 をクリックします。

        ワークスペース詳細 パネルで、vSwitch の CIDR ブロック をコピーします。

      2. RDS コンソールにログインします。

        左側のナビゲーションウィンドウで、インスタンス をクリックします。次に、左上隅でリージョンを選択します。

        [インスタンス] ページで、対象インスタンスの インスタンス ID/名前 をクリックして、その詳細ページを開きます。

      3. 左側のナビゲーションウィンドウで、ホワイトリストとセキュリティグループ をクリックします。

        ホワイトリスト設定 タブで、変更 をクリックします。

      4. ホワイトリストの編集 ダイアログボックスで、コピーした CIDR ブロックを IP アドレス フィールドに追加し、OK をクリックします。

    3. Flink コンソールにログインし、左上隅でリージョンを選択します。

      対象のワークスペースの名前をクリックします。左側のナビゲーションウィンドウで、カタログ を選択します。

    4. 右側の カタログリスト で、カタログの作成 をクリックします。カタログの作成 ダイアログボックスで、[MySQL] を選択し、次へ をクリックして、次のパラメーターを設定します:

      パラメーター

      必須

      説明

      catalog name

      必須

      MySQL カタログのカスタム名。例:mysql-catalog

      hostname

      必須

      • MySQL データベースの IP アドレスまたはホスト名。

      • RDS MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、データベース接続 をクリックして、データベースの 内部エンドポイントパブリックエンドポイント、および 内部ポート を表示します。

      • VPC をまたいで、またはパブリックネットワーク経由でデータベースにアクセスする場合は、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続」をご参照ください。

      port

      デフォルト

      サーバーへの接続に使用されるポート。デフォルト値は 3306 です。

      default database

      必須

      デフォルトのデータベース名。例:mysql_paimon

      username

      必須

      MySQL データベースサーバーへの接続に使用されるユーザー名。ApsaraDB RDS for MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、アカウント をクリックしてユーザー名を表示します。

      password

      必須

      MySQL データベースサーバーへの接続に使用されるパスワード。ApsaraDB RDS for MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、アカウント をクリックしてパスワードを表示します。

ステップ 5:Flink を使用した MySQL からのデータ読み取り、Paimon へのデータ書き込み、および DLF へのメタデータ同期

  1. Flink コンソールにログインし、左上隅でリージョンを選択します。

  2. 対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで 開発 > ETL を選択します。

  3. ドラフト タブで、image をクリックして新しいフォルダーを作成します。

  4. フォルダーを右クリックし、新しい空のストリームドラフト を選択します。新しいドラフト ダイアログボックスで、名前 を入力し、エンジンバージョン を選択します。

  5. ファイルに、次の CREATE TABLE AS (CTAS) SQL ステートメントを入力します。実際の構成に基づいて、コード内の名前を必ず変更してください。

    CREATE TABLE IF NOT EXISTS `<dlf_meta_db_name>`.`<OSS_bucket_name>`.`sales` 
    AS TABLE `<mysql_catalog_name>`.`<RDS_mysql_name>`.`sales`;
    
    -- このトピックの名前を使用する場合は、次のコードをコピーできます。
    CREATE TABLE IF NOT EXISTS `db_dlf_oss`.`flink_paimon`.`sales` 
    AS TABLE `mysql-catalog`.`mysql_paimon`.`sales`;
    1. (任意) 右上隅にある 検証 をクリックして、ジョブの Flink SQL ステートメントの構文を検証します。

    2. 右上隅にある デプロイ をクリックします。ドラフトのデプロイ ダイアログボックスで、コメントラベルデプロイメントターゲット を指定し、確認 をクリックします。

  6. 対象のワークスペース名をクリックします。左側のナビゲーションウィンドウで、O&M > デプロイメント を選択します。

  7. デプロイメント ページで、対象のジョブの名前をクリックして、その 設定 ページを開きます。

  8. 対象ジョブのデプロイメント詳細ページの右上隅にある 開始 をクリックし、初期モード を選択してから、開始 をクリックします。

  9. Paimon データをクエリします。

    左側のナビゲーションウィンドウで、開発 > スクリプト を選択します。

    新しいスクリプト タブで、image をクリックして新しいクエリスクリプトを作成できます。

    次のコードを実行します:

    SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;

    次の結果が返されます:

    image

  10. OSS コンソールに移動し、mc-lakehouse-dlf-oss/flink_paimon/ ディレクトリを確認します。sales/ フォルダーが生成されます。生成されたファイルは次の図に示されています:

    image

  11. Data Lake Formation (DLF) コンソールにログインし、左上隅でリージョンを選択します。

    左側のナビゲーションウィンドウで、Metadata > Metadata を選択します。

    データベース名 flink_paimon をクリックします。次の図に示すように、生成されたテーブルを表示できます:

    image

ステップ 6:MaxCompute での DLF+OSS 外部データソースの作成

  1. MaxCompute コンソールにログインし、左上隅でリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、構成の管理 > 外部データソース を選択します。

  3. 外部データソース ページで、外部データソースの作成 をクリックします。

  4. 外部データソースの追加 ダイアログボックスで、パラメーターを設定します。次の表にパラメーターを説明します。

    パラメーター

    必須

    説明

    外部データソースタイプ

    必須

    [DLF+OSS] を選択します。

    外部データソース名

    必須

    カスタム名。命名規則は次のとおりです:

    • 名前は英字で始まり、小文字の英字、アンダースコア (_)、数字のみを含めることができます。

    • 名前は 128 文字を超えることはできません。

    例:mysql_paimon_dlf

    外部データソースの説明

    任意

    必要に応じて説明を入力します。

    地理

    必須

    デフォルトで現在のリージョンが使用されます。

    DLF エンドポイント

    必須

    デフォルトで現在のリージョンの DLF エンドポイントが使用されます。

    OSS エンドポイント

    必須

    デフォルトで現在のリージョンの OSS エンドポイントが使用されます。

    RoleARN

    必須

    RAM ロールの Alibaba Cloud リソースネーム (ARN)。このロールは、DLF と OSS の両方のサービスにアクセスする権限を持っている必要があります。

    1. Resource Access Management (RAM) コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、ID > ロール を選択します。

    3. 基本情報 セクションで、ARN を確認できます。

    例:acs:ram::124****:role/aliyunodpsdefaultrole

    外部データソースの追加プロパティ

    任意

    外部データソースの追加プロパティ。これらのプロパティを指定すると、この外部データソースを使用するタスクは、定義された動作に基づいてソースシステムにアクセスできます。

    説明

    サポートされているパラメーターについては、公式ドキュメントをご参照ください。製品の進化に伴い、より多くのパラメーターがサポートされる予定です。

  5. OK をクリックして外部データソースを作成します。

  6. 外部データソース ページで、対象のデータソースを見つけ、操作 列の 詳細 をクリックします。

ステップ 7:外部スキーマの作成

MaxCompute に接続し、次のコマンドを入力します:

SET odps.namespace.schema=true;

CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH <external_data_source>
ON '<dlf_data_catalogue>.dlf_database';

次の表にパラメーターを説明します。

  • external_schema外部スキーマの名前。例:es_mc_dlf_oss_paimon

  • external_data_source:作成した外部データソースの名前。外部スキーマが属するプロジェクトは、外部データソースと同じリージョンにある必要があります。例:mysql_paimon_dlf

  • dlf_data_catalogueDLF データカタログの ID。詳細については、「データカタログの作成」をご参照ください。例:db_dlf_oss

  • dlf_database:指定された DLF データカタログ内のデータベースの名前。詳細については、「データベース、テーブル、および関数」をご参照ください。例:flink_paimon

ステップ 8:SQL を使用した OSS データへのアクセス

  1. MaxCompute クライアントにログインし、外部スキーマ内のテーブルをクエリします。

SET odps.namespace.schema=true;
use schema es_mc_dlf_oss_paimon;
SHOW tables IN es_mc_dlf_oss_paimon;

-- The following result is returned:
ALIYUN$xxx:sales

OK
  1. 外部スキーマのテーブル内のデータをクエリします。

SET odps.namespace.schema=true;
SELECT * FROM <maxcompute_project_name>.es_mc_dlf_oss_paimon.sales;

-- The following result is returned:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id         | year       | amount     | product_name | customer_name | order_date | region     | status     | 
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1          | 2020       | 100        | Product A    | Customer 1    | 2020-01-01 | Region 1   | Completed  | 
| 2          | 2020       | 200        | Product B    | Customer 2    | 2020-02-01 | Region 2   | Pending    | 
| 3          | 2021       | 150        | Product C    | Customer 3    | 2021-03-01 | Region 3   | Completed  | 
| 4          | 2021       | 300        | Product D    | Customer 4    | 2021-04-01 | Region 4   | Pending    | 
| 5          | 2022       | 250        | Product E    | Customer 5    | 2022-05-01 | Region 5   | Completed  | 
| 6          | 2022       | 400        | Product F    | Customer 6    | 2022-06-01 | Region 6   | Pending    | 
| 7          | 2023       | 350        | Product G    | Customer 7    | 2023-07-01 | Region 7   | Completed  | 
| 8          | 2023       | 500        | Product H    | Customer 8    | 2023-08-01 | Region 8   | Pending    | 
| 9          | 2020       | 450        | Product I    | Customer 9    | 2020-09-01 | Region 1   | Completed  | 
| 10         | 2021       | 600        | Product J    | Customer 10   | 2021-10-01 | Region 2   | Pending    | 
+------------+------------+------------+--------------+---------------+------------+------------+------------+