このトピックでは、Flink を使用して Paimon Data Lake Formation (DLF) カタログを作成し、MySQL のデータ変更キャプチャ (CDC) データを読み取り、データを Object Storage Service (OSS) に書き込み、メタデータを DLF に同期する方法について説明します。その後、MaxCompute の外部スキーマを使用して、データレイク内のデータに対してフェデレーテッドクエリを実行できます。
リージョン
サポート対象リージョン
リージョン名
リージョン ID
中国 (杭州)
cn-hangzhou
中国 (上海)
cn-shanghai
中国 (北京)
cn-beijing
中国 (張家口)
cn-zhangjiakou
中国 (深セン)
cn-shenzhen
中国 (香港)
cn-hongkong
シンガポール
ap-southeast-1
ドイツ (フランクフルト)
eu-central-1
MaxCompute、OSS、DLF、Flink は同じリージョンにデプロイする必要があります。
操作手順
前提条件
OSS サービスを有効化済みであること。
DLF サービスを有効化済みであること。
Flink サービスを有効化済みであること。
MaxCompute プロジェクトを作成し、プロジェクトレベルのメタデータに対するスキーマサポートを有効化済みであること。
ApsaraDB RDS for MySQL インスタンスを作成済みであること。
ステップ 1:DLF と OSS にアクセスするための MaxCompute 権限の付与
アカウントは、権限付与なしでは MaxCompute プロジェクトから DLF および OSS サービスにアクセスできません。次の 2 つの方法のいずれかで必要な権限を付与できます:
ワンクリック権限付与:同じアカウントを使用して MaxCompute プロジェクトを作成し、DLF をデプロイする場合、DLF の権限付与 をクリックしてワンクリックで権限を付与できます。
カスタム権限付与:DLF のカスタム権限付与を使用できます。この方法は、異なるアカウントを使用して MaxCompute プロジェクトを作成し、DLF をデプロイする場合にも適用されます。
ステップ 2:MySQL テストデータの準備
すでに MySQL のテストデータがある場合は、このステップをスキップできます。
RDS コンソールにログインします。
左側のナビゲーションウィンドウで、インスタンス をクリックします。次に、左上隅でリージョンを選択します。
インスタンスページで、目的のインスタンスの [インスタンス ID/名前] をクリックして、その詳細ページを開きます。
左側のナビゲーションウィンドウで、[データベース] をクリックします。
データベースの作成 をクリックします。次のパラメーターを設定します:
パラメーター
必須
説明
例
データベース名
必須
名前は 2~64 文字である必要があります。
先頭は英字、末尾は英字または数字である必要があります。
小文字の英字、数字、アンダースコア (_)、ハイフン (-) を使用できます。
データベース名はインスタンス内で一意である必要があります。
データベース名に
-が含まれている場合、作成されたデータベースのフォルダー名にある-は@002dに変換されます。
mysql_paimonサポートされる文字セット
必須
必要に応じて文字セットを選択します。
utf8承認者
任意
このデータベースにアクセスする必要があるアカウントを選択します。このパラメーターを空のままにして、データベース作成後にアカウントをアタッチすることもできます。
ここには標準アカウントのみが表示されます。特権アカウントはすべてのデータベースに対するすべての権限を持っているため、権限付与は不要です。
Default説明
任意
管理を容易にするためのデータベースの説明。説明は最大 256 文字です。
Flink テストデータベースを作成します。データベースにログイン をクリックします。左側のナビゲーションウィンドウで、データベースインスタンス を選択します。作成したデータベースをダブルクリックします。SQLConsole ページで、次のステートメントを実行してテストテーブルを作成し、テストデータを書き込みます。
インスタンスは存在するが、インスタンスを展開しても対象のデータベースが表示されない場合、次のいずれかの理由が考えられます:
ログインアカウントが対象のデータベースにアクセスできない:RDS インスタンスの アカウント ページに移動して、アカウントの権限を変更するか、ログインデータベースアカウントを変更できます。
メタデータが同期されておらず、ディレクトリが表示されない:対象のデータベースを含むインスタンスにマウスポインターを合わせます。インスタンス名の右側にある
ボタンをクリックして、データベースリストをリフレッシュします。
-- Create a table CREATE TABLE sales ( id INT NOT NULL AUTO_INCREMENT, year INT NOT NULL, amount DECIMAL(10,2) NOT NULL, product_name VARCHAR(100) NOT NULL, customer_name VARCHAR(100) NOT NULL, order_date DATE NOT NULL, region VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, PRIMARY KEY (id,year) ) PARTITION BY RANGE (year) ( PARTITION p2020 VALUES LESS THAN (2021), PARTITION p2021 VALUES LESS THAN (2022), PARTITION p2022 VALUES LESS THAN (2023), PARTITION p2023 VALUES LESS THAN (2024) ); -- Write data INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'), (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'), (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'), (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'), (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'), (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'), (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'), (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'), (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'), (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');テストテーブルのデータをクエリします。
SELECT * FROM sales;次の結果が返されます:

ステップ 3:DLF メタデータベースの準備
OSS コンソールにログインし、バケットを作成します。この例では、バケット名は
mc-lakehouse-dlf-ossです。詳細については、「バケットの作成」をご参照ください。バケット内に
flink_paimonという名前のフォルダーを作成します。Data Lake Formation (DLF) コンソールにログインし、左上隅でリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
Metadata ページで、Database タブをクリックします。
[デフォルト] の Catalog List で、Create Database をクリックします。次のパラメーターを設定します:
パラメーター
必須
説明
Catalog
必須
この例では、データカタログは [デフォルト] です。
Database Name:
必須
カスタムのデータベース名。名前は 1~128 文字で、先頭は英字、使用できるのは英字、数字、アンダースコア (_) のみです。例:
db_dlf_oss。Database Description:
任意
カスタムの説明。
Select Path:
必須
データベースのストレージ場所。例:
oss://mc-lakehouse-dlf-oss/flink_paimon/。
ステップ 4:Flink を使用した Paimon および MySQL カタログの作成
Paimon カタログの作成:
Flink コンソールにログインし、左上隅でリージョンを選択します。
対象のワークスペースの名前をクリックします。左側のナビゲーションウィンドウで、カタログ を選択します。
カタログリスト ページで、右側の カタログの作成 をクリックします。カタログの作成 ダイアログボックスで、[Apache Paimon] を選択し、次へ をクリックして、次のパラメーターを設定します:
パラメーター
必須
説明
metastore
必須
メタストアのタイプ。この例では、
dlfを選択します。catalog name
必須
関連付ける DLF カタログのバージョンを選択します。この例では、
v1.0を選択します。warehouse
必須
OSS で指定されたデータウェアハウスのディレクトリ。この例では、ディレクトリは
oss://mc-lakehouse-dlf-oss/flink_paimon/です。fs.oss.endpoint
必須
OSS サービスのエンドポイント。たとえば、中国 (杭州) リージョンのエンドポイントは
oss-cn-hangzhou-internal.aliyuncs.comです。fs.oss.accessKeyId
必須
OSS サービスへのアクセスに使用される AccessKey ID。
fs.oss.accessKeySecret
必須
OSS サービスへのアクセスに使用される AccessKey Secret。
dlf.catalog.accessKeyId
必須
DLF サービスへのアクセスに使用される AccessKey ID。
dlf.catalog.accessKeySecret
必須
DLF サービスへのアクセスに使用される AccessKey Secret。
MySQL カタログの作成:
Flink コンソールにログインし、左上隅でリージョンを選択します。
ホワイトリストに IP アドレスを追加します。
対象ワークスペースの 操作 列で、詳細 をクリックします。
ワークスペース詳細 パネルで、vSwitch の CIDR ブロック をコピーします。
RDS コンソールにログインします。
左側のナビゲーションウィンドウで、インスタンス をクリックします。次に、左上隅でリージョンを選択します。
[インスタンス] ページで、対象インスタンスの インスタンス ID/名前 をクリックして、その詳細ページを開きます。
左側のナビゲーションウィンドウで、ホワイトリストとセキュリティグループ をクリックします。
ホワイトリスト設定 タブで、変更 をクリックします。
ホワイトリストの編集 ダイアログボックスで、コピーした CIDR ブロックを IP アドレス フィールドに追加し、OK をクリックします。
Flink コンソールにログインし、左上隅でリージョンを選択します。
対象のワークスペースの名前をクリックします。左側のナビゲーションウィンドウで、カタログ を選択します。
右側の カタログリスト で、カタログの作成 をクリックします。カタログの作成 ダイアログボックスで、[MySQL] を選択し、次へ をクリックして、次のパラメーターを設定します:
パラメーター
必須
説明
catalog name
必須
MySQL カタログのカスタム名。例:
mysql-catalog。hostname
必須
MySQL データベースの IP アドレスまたはホスト名。
RDS MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、データベース接続 をクリックして、データベースの 内部エンドポイント、パブリックエンドポイント、および 内部ポート を表示します。
VPC をまたいで、またはパブリックネットワーク経由でデータベースにアクセスする場合は、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続」をご参照ください。
port
デフォルト
サーバーへの接続に使用されるポート。デフォルト値は 3306 です。
default database
必須
デフォルトのデータベース名。例:
mysql_paimon。username
必須
MySQL データベースサーバーへの接続に使用されるユーザー名。ApsaraDB RDS for MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、アカウント をクリックしてユーザー名を表示します。
password
必須
MySQL データベースサーバーへの接続に使用されるパスワード。ApsaraDB RDS for MySQL コンソールにログインできます。データベースインスタンスの詳細ページで、アカウント をクリックしてパスワードを表示します。
ステップ 5:Flink を使用した MySQL からのデータ読み取り、Paimon へのデータ書き込み、および DLF へのメタデータ同期
Flink コンソールにログインし、左上隅でリージョンを選択します。
対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで を選択します。
ドラフト タブで、
をクリックして新しいフォルダーを作成します。フォルダーを右クリックし、新しい空のストリームドラフト を選択します。新しいドラフト ダイアログボックスで、名前 を入力し、エンジンバージョン を選択します。
ファイルに、次の CREATE TABLE AS (CTAS) SQL ステートメントを入力します。実際の構成に基づいて、コード内の名前を必ず変更してください。
CREATE TABLE IF NOT EXISTS `<dlf_meta_db_name>`.`<OSS_bucket_name>`.`sales` AS TABLE `<mysql_catalog_name>`.`<RDS_mysql_name>`.`sales`; -- このトピックの名前を使用する場合は、次のコードをコピーできます。 CREATE TABLE IF NOT EXISTS `db_dlf_oss`.`flink_paimon`.`sales` AS TABLE `mysql-catalog`.`mysql_paimon`.`sales`;(任意) 右上隅にある 検証 をクリックして、ジョブの Flink SQL ステートメントの構文を検証します。
右上隅にある デプロイ をクリックします。ドラフトのデプロイ ダイアログボックスで、コメント、ラベル、デプロイメントターゲット を指定し、確認 をクリックします。
対象のワークスペース名をクリックします。左側のナビゲーションウィンドウで、 を選択します。
デプロイメント ページで、対象のジョブの名前をクリックして、その 設定 ページを開きます。
対象ジョブのデプロイメント詳細ページの右上隅にある 開始 をクリックし、初期モード を選択してから、開始 をクリックします。
Paimon データをクエリします。
左側のナビゲーションウィンドウで、 を選択します。
新しいスクリプト タブで、
をクリックして新しいクエリスクリプトを作成できます。次のコードを実行します:
SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;次の結果が返されます:

OSS コンソールに移動し、
mc-lakehouse-dlf-oss/flink_paimon/ディレクトリを確認します。sales/フォルダーが生成されます。生成されたファイルは次の図に示されています:
Data Lake Formation (DLF) コンソールにログインし、左上隅でリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
データベース名
flink_paimonをクリックします。次の図に示すように、生成されたテーブルを表示できます:
ステップ 6:MaxCompute での DLF+OSS 外部データソースの作成
MaxCompute コンソールにログインし、左上隅でリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
外部データソース ページで、外部データソースの作成 をクリックします。
外部データソースの追加 ダイアログボックスで、パラメーターを設定します。次の表にパラメーターを説明します。
パラメーター
必須
説明
外部データソースタイプ
必須
[DLF+OSS] を選択します。
外部データソース名
必須
カスタム名。命名規則は次のとおりです:
名前は英字で始まり、小文字の英字、アンダースコア (_)、数字のみを含めることができます。
名前は 128 文字を超えることはできません。
例:
mysql_paimon_dlf。外部データソースの説明
任意
必要に応じて説明を入力します。
地理
必須
デフォルトで現在のリージョンが使用されます。
DLF エンドポイント
必須
デフォルトで現在のリージョンの DLF エンドポイントが使用されます。
OSS エンドポイント
必須
デフォルトで現在のリージョンの OSS エンドポイントが使用されます。
RoleARN
必須
RAM ロールの Alibaba Cloud リソースネーム (ARN)。このロールは、DLF と OSS の両方のサービスにアクセスする権限を持っている必要があります。
左側のナビゲーションウィンドウで、 を選択します。
基本情報 セクションで、ARN を確認できます。
例:
acs:ram::124****:role/aliyunodpsdefaultrole。外部データソースの追加プロパティ
任意
外部データソースの追加プロパティ。これらのプロパティを指定すると、この外部データソースを使用するタスクは、定義された動作に基づいてソースシステムにアクセスできます。
説明サポートされているパラメーターについては、公式ドキュメントをご参照ください。製品の進化に伴い、より多くのパラメーターがサポートされる予定です。
OK をクリックして外部データソースを作成します。
外部データソース ページで、対象のデータソースを見つけ、操作 列の 詳細 をクリックします。
ステップ 7:外部スキーマの作成
MaxCompute に接続し、次のコマンドを入力します:
SET odps.namespace.schema=true;
CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH <external_data_source>
ON '<dlf_data_catalogue>.dlf_database';次の表にパラメーターを説明します。
external_schema:外部スキーマの名前。例:
es_mc_dlf_oss_paimon。external_data_source:作成した外部データソースの名前。外部スキーマが属するプロジェクトは、外部データソースと同じリージョンにある必要があります。例:
mysql_paimon_dlf。dlf_data_catalogue:DLF データカタログの ID。詳細については、「データカタログの作成」をご参照ください。例:
db_dlf_oss。dlf_database:指定された DLF データカタログ内のデータベースの名前。詳細については、「データベース、テーブル、および関数」をご参照ください。例:
flink_paimon。
ステップ 8:SQL を使用した OSS データへのアクセス
MaxCompute クライアントにログインし、外部スキーマ内のテーブルをクエリします。
SET odps.namespace.schema=true;
use schema es_mc_dlf_oss_paimon;
SHOW tables IN es_mc_dlf_oss_paimon;
-- The following result is returned:
ALIYUN$xxx:sales
OK外部スキーマのテーブル内のデータをクエリします。
SET odps.namespace.schema=true;
SELECT * FROM <maxcompute_project_name>.es_mc_dlf_oss_paimon.sales;
-- The following result is returned:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id | year | amount | product_name | customer_name | order_date | region | status |
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1 | 2020 | 100 | Product A | Customer 1 | 2020-01-01 | Region 1 | Completed |
| 2 | 2020 | 200 | Product B | Customer 2 | 2020-02-01 | Region 2 | Pending |
| 3 | 2021 | 150 | Product C | Customer 3 | 2021-03-01 | Region 3 | Completed |
| 4 | 2021 | 300 | Product D | Customer 4 | 2021-04-01 | Region 4 | Pending |
| 5 | 2022 | 250 | Product E | Customer 5 | 2022-05-01 | Region 5 | Completed |
| 6 | 2022 | 400 | Product F | Customer 6 | 2022-06-01 | Region 6 | Pending |
| 7 | 2023 | 350 | Product G | Customer 7 | 2023-07-01 | Region 7 | Completed |
| 8 | 2023 | 500 | Product H | Customer 8 | 2023-08-01 | Region 8 | Pending |
| 9 | 2020 | 450 | Product I | Customer 9 | 2020-09-01 | Region 1 | Completed |
| 10 | 2021 | 600 | Product J | Customer 10 | 2021-10-01 | Region 2 | Pending |
+------------+------------+------------+--------------+---------------+------------+------------+------------+