すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Apache Paimon を使用したリアルタイムデータレイクの概要

最終更新日:Nov 09, 2025

Apache Paimon は、ストリーミングモードとバッチモードでデータを処理できるデータレイクストレージです。 Apache Paimon は、高スループットのデータ書き込みと低レイテンシのデータクエリをサポートしています。 このトピックでは、Apache Paimon カタログと MySQL コネクタを使用して、ApsaraDB RDS から Apache Paimon テーブルに注文データとテーブルスキーマの変更をインポートし、Realtime Compute for Apache Flink コンソールで Apache Paimon テーブルのデータに対して簡単な分析を実行する方法について説明します。

背景情報

Apache Paimon は、ストリーミングモードとバッチモードでデータを処理できるデータレイクストレージです。 Apache Paimon は、高スループットのデータ書き込みと低レイテンシのデータクエリをサポートしています。 Alibaba Cloud Realtime Compute for Apache Flink と、オープンソースのビッグデータプラットフォームである E-MapReduce (EMR) の Spark、Hive、Trino などの一般的に使用されるコンピューティングエンジンは、Apache Paimon と統合できます。 Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上に独自のデータレイクストレージサービスを迅速に構築し、Apache Paimon をコンピューティングエンジンと統合して、コンピューティングエンジンが Apache Paimon のデータにアクセスしてデータレイク分析を実行できるようにすることができます。

前提条件

  • RAM ユーザーまたは RAM ロールに必要な権限が付与されていること。 Realtime Compute for Apache Flink の開発コンソールに RAM ユーザーまたは RAM ロールを使用してアクセスする場合、この前提条件を満たす必要があります。 詳細については、「権限管理」をご参照ください。

  • Realtime Compute for Apache Flink ワークスペースが作成されていること。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

手順 1:データソースを準備する

  1. ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する

    説明

    Realtime Compute for Apache Flink ワークスペースと同じ VPC に ApsaraDB RDS for MySQL インスタンスを作成することをお勧めします。 ApsaraDB RDS for MySQL インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC 内にない場合は、それらの間に接続を確立する必要があります。 詳細については、「ネットワーク接続」をご参照ください。

    orders という名前のデータベースを作成し、orders データベースに対する読み取りおよび書き込み権限を持つ特権アカウントまたは標準アカウントを作成します。

  2. 目的の ApsaraDB RDS for MySQL インスタンスにログオンする。 orders データベースに orders_1 テーブルと orders_2 テーブルを作成します。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. 次のテストデータを挿入します。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

手順 2:カタログを作成する

  1. カタログページに移動します。

    1. Realtime Compute for Apache Flink コンソール にログオンします。

    2. 管理するワークスペースを見つけ、操作 列の コンソール をクリックします。

    3. [カタログ] をクリックします。

  2. Apache Paimon カタログを作成します。

    1. 表示されたページで、[カタログの作成] をクリックします。 [組み込みカタログ] タブの [カタログの作成] ダイアログボックスで、[apache Paimon] をクリックし、[次へ] をクリックします。

    2. [カタログの構成] 手順でパラメーターを構成します。

      image.png

      パラメーター

      説明

      備考

      カタログ名

      Apache Paimon カタログの名前。

      この例では、paimon-catalog と入力します。

      メタストア

      Apache Paimon テーブルのメタデータストレージタイプ。有効な値:

      • filesystem:メタデータは OSS バケットにのみ保存されます。

      • dlf:メタデータは OSS バケットに保存され、Alibaba Cloud Data Lake Formation (DLF) にも同期されます。

      この例では、filesystem が選択されています。

      ウェアハウス

      Apache Paimon カタログのストレージルートディレクトリ。このディレクトリは OSS ディレクトリです。 Realtime Compute for Apache Flink を有効にしたときに使用される OSS バケットを選択できます。 Alibaba Cloud アカウントと同じリージョン内の別の OSS バケットを使用することもできます。

      このパラメーターの値は、oss://<bucket>/<object> 形式です。ディレクトリ内のパラメーター:

      • bucket:作成した OSS バケットの名前。

      • object:データが保存されているパス。

      OSS コンソール でバケット名とオブジェクト名を表示できます。

      fs.oss.endpoint

      OSS のエンドポイント。

      DLF が Realtime Compute for Apache Flink と同じリージョンにある場合は、OSS の VPC エンドポイントが使用されます。それ以外の場合は、パブリックエンドポイントが使用されます。必要な情報の取得方法については、「リージョンとエンドポイント」をご参照ください。

      fs.oss.accessKeyId

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      AccessKey ID をお持ちでない場合は、AccessKey ペアを作成します。詳細については、「AccessKey ペアの作成」をご参照ください。

      fs.oss.accessKeySecret

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

      この例では、プレーンテキストストレージによるシークレット漏洩のリスクを防ぐため、AccessKey シークレットが変数として導入されています。詳細については、「名前空間変数」をご参照ください。

    3. [確認] をクリックします。

  3. MySQL カタログを作成します。

    1. [カタログリスト] ページで、[カタログの作成] をクリックします。 [組み込みカタログ] タブの [カタログの作成] ダイアログボックスで、[mysql] をクリックし、[次へ] をクリックします。

    2. [カタログの構成] 手順でパラメーターを構成します。

      mysql-catalog.png

      パラメーター

      説明

      備考

      カタログ名

      MySQL カタログの名前。

      この例では、mysql-catalog と入力します。

      ホスト名

      MySQL データベースへのアクセスに使用する IP アドレスまたはホスト名。

      この例では、ApsaraDB RDS for MySQL インスタンスの内部エンドポイントが使用されます。

      ポート

      MySQL データベースへのアクセスに使用するポート。

      デフォルト値:3306。

      デフォルトデータベース

      デフォルトの MySQL データベースの名前。

      この例では、手順 1:データソースを準備する で作成したデータベースの名前が使用されます。

      ユーザー名

      MySQL データベースへのアクセスに使用するユーザー名。

      データベースへのアクセスに使用するユーザー名を入力します。

      パスワード

      MySQL データベースへのアクセスに使用するパスワード。

      この例では、プレーンテキストストレージによるシークレット漏洩のリスクを防ぐため、AccessKey シークレットが変数として導入されています。詳細については、「名前空間変数」をご参照ください。

    3. [確認] をクリックします。

手順 3:Realtime Compute for Apache Flink ドラフトを作成する

  1. 左側のナビゲーションウィンドウで、[開発] > ETL を選択します。 SQL エディターページの左上隅にある [新規] をクリックします。

  2. [新規ドラフト] ダイアログボックスの [SQL スクリプト] タブで、[空のストリームドラフト] を選択し、[次へ] をクリックします。

  3. [新規ドラフト] ダイアログボックスで、ドラフトのパラメーターを構成します。次の表にパラメーターを示します。

    パラメーター

    説明

    [名前]

    作成するドラフトの名前。

    説明

    ドラフト名は現在のプロジェクト内で一意である必要があります。

    [場所]

    ドラフトのコードファイルが保存されるフォルダー。

    既存のフォルダーの右側にある 新建文件夹 アイコンをクリックして、サブフォルダーを作成することもできます。

    [エンジンバージョン]

    デプロイメントで使用される Flink のエンジンバージョンを表示できます。

    [推奨] ラベルが付いたエンジンバージョンを使用することをお勧めします。ラベル付きのバージョンは、より高い信頼性とパフォーマンスを提供します。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

  4. [作成] をクリックします。

  5. 次のステートメントを入力して、orders データベース内の関連テーブルの変更をリアルタイムでキャプチャし、変更を Apache Paimon テーブルに同期します。

    -- 正規表現 orders_\d+ に一致する MySQL テーブルをキャプチャし、MySQL テーブルの変更を Apache Paimon のデフォルトデータベースの orders テーブルに同期します。
    CREATE TABLE IF NOT EXISTS `paimon-catalog`.`default`.`orders` AS TABLE `mysql-catalog`.`orders`.`orders_\d+`;

    CREATE TABLE AS ステートメントの使用方法の詳細については、「CREATE TABLE AS ステートメント」をご参照ください。

  6. オプション。 SQL エディターページの右上隅にある [検証] をクリックして、Realtime Compute for Apache Flink ドラフトの SQL ステートメントに構文エラーが含まれているかどうかを確認します。

  7. SQL エディターページの右上隅にある [デプロイ] をクリックします。次に、[OK] をクリックします。

  8. 左側のナビゲーションウィンドウで、[O&M] > デプロイメント を選択します。 [デプロイメント] ページで、管理するデプロイメントの名前をクリックして、デプロイメントの詳細ページに移動します。

  9. [デプロイメント] ページの [構成] タブで、[パラメーター] セクションの右上隅にある [編集] をクリックします。

    この例では、デプロイメントの結果をすばやく取得するために、[チェックポイント間隔] パラメーターと [チェックポイント間の最小間隔] パラメーターの値を 10 秒に変更します。次に、[保存] をクリックします。

    image

  10. [構成] タブの右上隅にある [開始] をクリックします。 [ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

    image.png

  11. Apache Paimon テーブルのデータをクエリします。

    1. 左側のナビゲーションウィンドウで、[開発] > スクリプト を選択します。 [スクリプト] タブのドラフトのスクリプトエディターに次のコードをコピーします。

      select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
    2. コードを選択し、コードの左側に表示される [実行] をクリックします。

      image.png

手順 4:MySQL テーブルのスキーマを更新する

このセクションでは、MySQL テーブルスキーマの変更を Apache Paimon テーブルに同期する方法を示します。

  1. ApsaraDB RDS コンソール にログオンします。

  2. orders データベースにログオンし、次の SQL ステートメントを入力して、[実行] をクリックして、2 つのデータテーブルに列を追加し、データテーブルにデータを入力します。実行

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. Realtime Compute for Apache Flink の左側のナビゲーションウィンドウで、[開発] > スクリプト を選択します。 [スクリプト] タブのスクリプトエディターに次のコードをコピーします。

    select * from `test`.`default`.`orders` where `quantity` is not null;

    コードを選択し、コードの左側に表示される [実行] をクリックします。

    Image 32

関連情報