すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:データベースデータの Apache Paimon へのストリーミング

最終更新日:Jun 23, 2026

Apache Paimon は、ストリーミングとバッチ処理の両方に対応した統合レイクストレージフォーマットであり、高スループットの書き込みと低レイテンシーのクエリをサポートします。このチュートリアルでは、Paimon カタログと MySQL コネクタを使用して、ApsaraDB RDS for MySQL インスタンスから注文データとテーブルスキーマの変更を Paimon テーブルにインポートする方法について説明します。また、Flink を使用して Paimon テーブルで簡単な分析を実行する方法についても説明します。

背景情報

Apache Paimon は、ストリーミングとバッチ処理の両方に対応した統合レイクストレージフォーマットであり、高スループットの書き込みと低レイテンシーのクエリをサポートします。現在、Realtime Compute for Apache Flink (Flink) や E-MapReduce 上の Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon との連携が良好です。Apache Paimon を使用すると、HDFS または OSS 上に独自のデータレイクストレージサービスを迅速に構築し、コンピュートエンジンに接続してデータレイク内のデータを分析できます。

前提条件

  • RAM ユーザーまたは RAM ロールを使用して操作を実行する場合、RAM ユーザーまたは RAM ロールが Flink コンソールにアクセスするために必要な権限を持っていることを確認してください。詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースを作成済みであること。詳細については、「Realtime Compute for Apache Flink のアクティベーション」をご参照ください。

ステップ 1:データソースの準備

  1. ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成します

    説明

    ApsaraDB RDS for MySQL インスタンスは、Flink ワークスペースと同じ VPC 内にある必要があります。インスタンスとワークスペースが異なる VPC にある場合は、「ネットワーク接続」をご参照ください。

    orders という名前のデータベースと、orders データベースへの読み取りおよび書き込み権限を持つアカウントを作成します。

  2. ApsaraDB RDS for MySQL インスタンスに接続し、orders データベースに orders_1 テーブルと orders_2 テーブルを作成します。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. 次のテストデータを挿入します。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

ステップ 2:カタログの作成

  1. [Data Management] ページに移動します。

    1. Flink コンソールにログインします。

    2. 管理したいワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. [Data Management] をクリックします。

  2. Paimon カタログを作成します。

    1. [カタログの作成] をクリックします。[組み込みカタログ] タブで Apache Paimon を選択し、[次へ] をクリックします。

    2. パラメーターを設定します。

      パラメーター

      説明

      catalog name

      Paimon カタログの名前。

      paimon-catalog

      metastore

      Paimon テーブルのメタデータのストレージタイプ。有効な値は次のとおりです:

      • filesystem:メタデータは OSS にのみ保存されます。

      • dlf:メタデータは OSS に保存され、Data Lake Formation (DLF) に同期されます。

      この例では filesystem を使用します。

      warehouse

      Paimon カタログを保存するためのルートディレクトリ。ディレクトリは OSS ディレクトリである必要があります。Flink のアクティベーションに使用した OSS バケット、または同じ Alibaba Cloud アカウントに属し、ワークスペースと同じリージョンにある別の OSS バケットを選択できます。

      値は oss://<bucket>/<object> のフォーマットである必要があります。このフォーマットでは、

      • bucket:作成した OSS バケットの名前。

      • object:データが保存されているディレクトリへのパス。

      バケットとオブジェクトの名前は OSS コンソールで確認できます。

      fs.oss.endpoint

      OSS サービスのエンドポイント。

      Flink と DLF が同じリージョンにデプロイされている場合は、VPC エンドポイントを使用します。それ以外の場合は、パブリックエンドポイントを使用します。詳細については、「リージョンとエンドポイント」をご参照ください。

      fs.oss.accessKeyId

      OSS の読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      AccessKey ID をお持ちでない場合は、「AccessKey ペアの作成」をご参照ください。

      fs.oss.accessKeySecret

      Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

      この例では、プレーンテキストの漏洩を防ぐために、AccessKey Secret に名前空間変数を使用しています。詳細については、「名前空間変数」をご参照ください。

    3. [OK] をクリックします。

  3. MySQL カタログを作成します。

    1. [カタログの作成] をクリックします。[組み込みカタログ] タブで MySQL を選択し、[次へ] をクリックします。

    2. パラメーターを設定します。

      パラメーター

      説明

      catalog name

      MySQL カタログの名前。

      mysql-catalog

      hostname

      MySQL データベースの IP アドレスまたはホスト名。

      この例では、ApsaraDB RDS for MySQL インスタンスの内部エンドポイントを入力します。

      port

      MySQL データベースサービスのポート番号。

      デフォルト値は 3306 です。

      default-database

      デフォルトの MySQL データベースの名前。

      この例では、「ステップ 1:データソースの準備」で作成した orders データベースを入力します。

      username

      MySQL データベースのユーザー名。

      ご利用のデータベースのユーザー名を入力します。

      password

      MySQL データベースのパスワード。

      この例では、プレーンテキストの漏洩を防ぐために、パスワードとして名前空間変数を使用しています。詳細については、「名前空間変数」をご参照ください。

    3. [OK] をクリックします。

ステップ 3:Flink ジョブの作成

  1. Flink の開発コンソールに移動し、データインジェストジョブを作成します。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースの [操作] 列にある [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[データ開発] > >[データインジェスト] を選択します。

    4. image アイコンをクリックし、[新しいデータインジェストドラフトの作成] をクリックして、[ファイル名] を指定し、[エンジンバージョン] を選択します。

      パラメーター

      説明

      [ファイル名]

      ジョブの名前。

      説明

      ジョブ名はプロジェクト内で一意である必要があります。

      flink-test

      エンジンバージョン

      ジョブの Flink エンジンバージョン。

      [推奨] または [安定] タグが付いたバージョンを使用することを推奨します。これらのタグが付いたバージョンは、より高い信頼性と優れたパフォーマンスを提供します。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

      vvr-11.5-jdk11-flink-1.20

    5. [作成] をクリックします。

  2. 次の文を入力して、orders データベース内の指定されたテーブルへの変更をリアルタイムでキャプチャし、その変更を Paimon テーブルに同期します。

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: 3306
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: orders.orders_\d+
      server-id: 8601-8604
      #(任意) 増分フェーズ中に新しく追加されたテーブルからデータを同期します。
      scan.binlog.newly-added-table.enabled: true
      #(任意) テーブルとフィールドのコメントを同期します。
      include-comments.enabled: true
      #(任意) 無制限のシャードのディストリビューションを優先して、TaskManager での Out Of Memory (OOM) の問題を回避します。
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      #(任意) 解析とフィルタリングを有効にして、データ読み取りを高速化します。
      scan.only.deserialize.captured.tables.changelog.enabled: true
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx
      #(任意) 削除ベクターを有効にして、データ読み取りパフォーマンスを向上させます。
      table.properties.deletion-vectors.enabled: true
    # DLF カタログを使用する場合は、次の構成を使用します。
    # sink:
    #  type: paimon
    #  catalog.properties.metastore: rest
    #  catalog.properties.token.provider: dlf
    #  catalog.properties.uri: dlf_uri
    #  catalog.properties.warehouse: your_warehouse
    #  table.properties.deletion-vectors.enabled: true   
    # 正規表現 orders_\\d+ に一致する名前の MySQL テーブルから変更をキャプチャし、その変更をデフォルトの Paimon データベースの orders テーブルに同期します。  
    route:
      - source-table: orders.orders_\d+
        sink-table: default.orders

    データインジェストジョブの設定方法の詳細については、「Flink CDC ベースのデータインジェストジョブの開発」をご参照ください。

  3. ページの右上隅にある [デプロイ] をクリックします。表示されるメッセージで [OK] をクリックします。

  4. 左側のナビゲーションウィンドウで、[O&M センター] > >[ジョブ O&M] を選択します。次に、ジョブの名前をクリックして、その [デプロイメント] タブを開きます。

  5. [実行パラメーター] セクションで、[編集] をクリックします。

    ジョブの結果をより早く確認するには、[システムチェックポイント間隔][システムチェックポイント間の最小間隔] を 10 秒に設定し、[保存] をクリックします。

  6. ジョブの [デプロイメント] タブで [開始] をクリックします。[ジョブの開始] ダイアログボックスで [ステートレス開始] を選択し、[開始] をクリックします。

  7. Paimon データをクエリします。

    1. [データ開発] > >[データクエリ] ページの [クエリスクリプト] タブで、次のコードをクエリスクリプトにコピーします。

      select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
    2. 実行したいコードを選択し、コードの左側に表示される [実行] をクリックします。

      select custkey, sum(total_price) from paimon-catalog.default.orders group by custkey; SQL 文が実行されると、クエリステータスが「完了」に変わります。結果テーブルには、orders テーブルの total_price の値を custkey でグループ化した合計が表示されます。クエリは、custkey の値が 0 から 6 までの合計 7 行のデータを返します。

ステップ 4:MySQL スキーマの更新

このセクションでは、MySQL テーブルから Paimon テーブルにスキーマの変更を同期する方法について説明します。

  1. ApsaraDB RDS コンソールにログインします。

  2. orders データベースで、次の SQL 文を入力して実行し、両方のデータテーブルに列を追加してデータを更新します。次に、[実行] をクリックします。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. Flink コンソール[データ開発] > >[データクエリ] ページの [クエリスクリプト] タブで、次のコードをクエリスクリプトにコピーします。

    select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;

    実行したいコードを選択し、コードの左側に表示される [実行] をクリックします。

    クエリが成功すると、9 件のレコードが返されます。データは orders_1 テーブルと orders_2 テーブルのもので、各レコードの quantity 列の値は 100 です。

関連ドキュメント