Apache Paimon は、ストリーミングおよびバッチ処理のための統一されたレイクストレージフォーマットです。高スループットの書き込みと低レイテンシーのクエリをサポートします。このトピックでは、Paimon カタログと MySQL コネクタを使用して、ApsaraDB RDS for MySQL インスタンスから注文データとテーブルスキーマの変更を Paimon テーブルにインポートする方法について説明します。また、Flink を使用して Paimon テーブルの簡単な分析も行います。
背景情報
Apache Paimon は、ストリーミングおよびバッチ処理のための統一されたレイクストレージフォーマットであり、高スループットの書き込みと低レイテンシーのクエリをサポートします。Realtime Compute for Apache Flink および、オープンソースのビッグデータプラットフォームである E-MapReduce (EMR) 上の Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon と緊密に統合されています。Apache Paimon を使用すると、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを迅速に構築できます。その後、コンピュートエンジンを接続してデータレイク内のデータを分析できます。
前提条件
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用する場合、そのユーザーまたはロールに Realtime Compute for Apache Flink コンソールにアクセスするために必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。
Realtime Compute for Apache Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
ステップ 1:データソースの準備
ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成します。
説明ApsaraDB RDS for MySQL インスタンスは、Flink ワークスペースと同じ VPC にある必要があります。インスタンスとワークスペースが同じ VPC にない場合は、「ネットワーク接続」をご参照ください。
orders という名前のデータベースと、orders データベースに対する読み取りおよび書き込み権限を持つ特権アカウントまたは標準アカウントを作成します。
ApsaraDB RDS for MySQL インスタンスに接続し、orders データベースに orders_1 と orders_2 というテーブルを作成します。
CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) );次のテストデータを挿入します。
INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
ステップ 2:カタログの作成
[Data Management] ページに移動します。
Realtime Compute for Apache Flink コンソール にログオンします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
[データ管理] をクリックします。
Paimon カタログを作成します。
[カタログの作成] をクリックします。[組み込みカタログ] タブで、[Apache Paimon] を選択して、Next をクリックします。
構成情報を入力します。

パラメーター
説明
注意:
カタログ名
カスタムの Paimon カタログ名を入力できます。
このトピックでは paimon-catalog を使用します。
メタストア
Paimon テーブルのメタストアのタイプ。
filesystem:メタデータを OSS にのみ保存します。
dlf:メタデータを OSS に保存し、Data Lake Formation (DLF) に同期します。
このトピックでは filesystem を使用します。
ウェアハウス
Paimon カタログのルートディレクトリ。これは OSS ディレクトリである必要があります。Realtime Compute for Apache Flink を有効化したときに作成された OSS バケット、またはご利用の Alibaba Cloud アカウント配下の同じリージョンにある別の OSS バケットを使用できます。
フォーマットは oss://<bucket>/<object> です。ここで、
bucket:ご利用の OSS バケットの名前。
object:データが保存されているディレクトリへのパス。
OSS コンソールでバケット名とオブジェクト名を確認できます。
fs.oss.endpoint
OSS サービスへの接続エンドポイント。
Flink と DLF が同じリージョンにある場合は、VPC エンドポイントを使用します。それ以外の場合は、パブリックエンドポイントを使用します。エンドポイントの取得方法の詳細については、「リージョンとエンドポイント」をご参照ください。
fs.oss.accessKeyId
OSS への読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。
AccessKey ID をお持ちでない場合は、「AccessKey ペアの作成」をご参照ください。
fs.oss.accessKeySecret
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。
このトピックでは、プレーンテキストでの公開を防ぐために AccessKey Secret に変数を使用します。詳細については、「プロジェクト変数」をご参照ください。
[OK] をクリックします。
MySQL カタログを作成します。
[カタログの作成] をクリックします。[組み込みカタログ]タブで、 [MySQL] を選択し、[次へ] をクリックします。
構成情報を入力します。

パラメーター
説明
注意:
カタログ名
MySQL カタログの名前。
このトピックでは mysql-catalog を使用します。
ホスト名
MySQL データベースの IP アドレスまたはホスト名。
このトピックでは、RDS インスタンスのプライベートエンドポイントを使用します。
ポート
MySQL データベースサービスのポート。
デフォルト値は 3306 です。
デフォルトデータベース
デフォルトデータベースの名前。
ステップ 1:データソースの準備で作成した orders データベースを入力します。
ユーザー名
MySQL データベースへの接続に使用するユーザー名。
ご利用のデータベースのユーザー名を入力します。
パスワード
MySQL データベースサービスのパスワードを指定します。
このトピックでは、プレーンテキストでの公開を防ぐためにパスワードに変数を使用します。詳細については、「プロジェクト変数」をご参照ください。
[OK] をクリックします。
ステップ 3:Flink ジョブの作成
リアルタイムコンピューティング開発コンソールにログインし、データインジェストジョブを作成します。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。
をクリックし、次に [データインジェストドラフトの作成] をクリックします。[ファイル名] を入力し、[データベースエンジンバージョン] を選択します。ジョブパラメーター
説明
例
[ファイル名]
ジョブの名前です。
説明ジョブ名は、現在のプロジェクト内で一意である必要があります。
flink-test
[エンジンバージョン]
現在のジョブの Flink エンジンバージョンです。
[推奨] または [安定] のタグが付いたバージョンのご使用を推奨します。これらのバージョンは、より高い信頼性と優れたパフォーマンスを提供します。エンジンバージョンの詳細については、「機能リリースノート」および「エンジンバージョンの概要」をご参照ください。
vvr-11.5-jdk11-flink-1.20
[作成] をクリックします。
次の文を入力して、orders データベースの関連テーブルからリアルタイムの変更をキャプチャし、Paimon テーブルに同期します。
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: 3306 username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: orders.orders_\d+ server-id: 8601-8604 # オプション。増分フェーズで新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # オプション。テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # オプション。無制限のシャードを優先的に分散させ、TaskManager の OutOfMemory 問題の可能性を防ぎます。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # オプション。解析フィルターを有効にして、読み取りを高速化します。 scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxx # オプション。削除ベクターを有効にして、読み取りパフォーマンスを向上させます。 table.properties.deletion-vectors.enabled: true # DLF カタログを使用する場合は、次の構成を使用します # sink: # type: paimon # catalog.properties.metastore: rest # catalog.properties.token.provider: dlf # catalog.properties.uri: dlf_uri # catalog.properties.warehouse: your_warehouse # table.properties.deletion-vectors.enabled: true # 正規表現 orders_\d+ に一致する MySQL テーブルをキャプチャし、変更を Paimon のデフォルトデータベースの orders テーブルに同期します。 route: - source-table: orders.orders_\d+ sink-table: default.ordersデータインジェストジョブの構成方法の詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。
右上隅で、[デプロイ] をクリックします。次に、[OK] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。[デプロイメント] ページで、対象のジョブの名前をクリックして、ジョブのデプロイメント詳細ページに移動します。
[パラメーター] セクションの右上隅で、[編集] をクリックします。
ジョブの実行結果をより迅速に確認するには、[チェックポイント間隔] と [チェックポイント間の最小間隔] パラメーターの値を 10 秒に変更し、[保存] をクリックします。

対象のジョブのデプロイメント詳細ページの上部で、[開始] をクリックし、[ステートレス開始] を選択し、次に [開始] をクリックします。

Paimon データをクエリします。
ページの [クエリスクリプト] タブで、次のコードをクエリスクリプトエディターにコピーします。
select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;ターゲットセグメントを選択し、コード行の左側にある[実行]をクリックします。

ステップ 4:MySQL テーブルスキーマの更新
このセクションでは、MySQL テーブルスキーマの変更を Paimon テーブルに同期する方法を説明します。
ApsaraDB RDS コンソール にログオンします。
orders データベースで、次の SQL 文を入力し、[実行] をクリックして、2 つのテーブルに列を追加し、その列にデータを入力します。
ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;Realtime Compute for Apache Flink コンソールの ページの [スクリプト] タブで、次のコードをスクリプトエディターにコピーします。
select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;ターゲットセグメントを選択し、コード行の左側にある[実行]をクリックします。

参考
ストリーミングデータレイクハウス用の Paimon コネクタは、Paimon カタログと共に使用されます。その使用方法と機能の詳細については、「ストリーミングデータレイクハウス用 Paimon コネクタ」をご参照ください。
Paimon カタログの使用方法の詳細については、「Paimon カタログの管理」をご参照ください。
複雑なシナリオでのデータインジェストジョブの構成に関するベストプラクティスについては、「Flink CDC データインジェストのベストプラクティス」をご参照ください。