OLTP データベースが大規模になると、スループット向上のために複数のデータベースやテーブルにデータをシャーディングすることがよくありますが、これにより統一的な分析が困難になります。Data Lake Formation (DLF) は、Realtime Compute for Apache Flink (Ververica Platform (VVP) 上に構築) および Flink Change Data Capture (CDC) と統合することで、これらのデータをリアルタイムでデータレイクに統合します。その後、DLF は一元的なメタデータ管理を提供するため、E-MapReduce (EMR)、MaxCompute、Hologres などの複数の分析エンジンから、データを再移動することなく同じデータにクエリを実行できます。
このチュートリアルでは、Flink CDC を使用して MySQL データベースから変更をキャプチャし、それらを Apache Hudi の結果テーブルに書き込み、テーブルメタデータを DLF カタログに同期し、Flink SQL を使用してデータレイクにクエリを実行するまでの一連のエンドツーエンドのパイプラインを、Java や Scala のコードを一切記述することなく解説します。
仕組み
Realtime Compute for Apache Flink は、mysql-cdc コネクタを使用して MySQL ソースから変更イベントを読み取ります。Flink はこれらの変更を Apache Hudi の結果テーブルに書き込み、同時にテーブルスキーマを DLF カタログに同期します。メタデータが DLF に登録されると、Flink SQL コンソールや接続されたコンピュートエンジンから直接 Hudi テーブルにクエリを実行できます。また、DLF はデータレイクのライフサイクル管理とレイクフォーマットの最適化も処理し、長期にわたってデータのアクセシビリティとコスト効率を維持します。
前提条件
開始する前に、以下を確認してください。
Realtime Compute for Apache Flink が有効化されており、Flink 完全管理ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の開始」をご参照ください。
DLF が有効化されていること。まだ有効化されていない場合は、Data Lake Formation の製品ページに移動し、[今すぐ有効化] をクリックしてください。
ApsaraDB RDS for MySQL インスタンスが、Realtime Compute for Apache Flink ワークスペースと同じリージョンおよび VPC にあり、MySQL 5.7 以降を実行していること。設定手順については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。別のソースデータベースを使用する場合は、この項目をスキップしてください。
ステップ 1: MySQL データの準備
ApsaraDB RDS for MySQL インスタンスにログインします。詳細については、「ApsaraDB RDS for MySQL インスタンスへの接続」をご参照ください。
次の SQL を実行して、テストデータベースとテーブルを作成し、サンプルデータを挿入します。
CREATE DATABASE testdb; CREATE TABLE testdb.student ( `id` bigint(20) NOT NULL, `name` varchar(256) DEFAULT NULL, `age` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ); INSERT INTO testdb.student VALUES (1,'name1',10); INSERT INTO testdb.student VALUES (2,'name2',20);
ステップ 2: Flink での DLF カタログの作成
[Flink 完全管理] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。
左側のナビゲーションウィンドウで [Catalogs] をクリックし、次に [Create Catalog] をクリックします。
「[カタログの作成]」ページで、[DLF] を選択し、「[次へ]」をクリックします。
カタログ構成を入力し、[確認] をクリックします。パラメーターの詳細については、「DLF カタログの管理」をご参照ください。

カタログが作成されると、[Catalogs] の下に dlf として表示されます。これは DLF のデフォルトのデータカタログです。

ステップ 3: Flink データレイクジョブの作成
ソーステーブルと結果テーブルの作成
左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。
SQL 編集エリアで、次の SQL を入力し、[実行] をクリックします。
-- MySQL から変更イベントを読み取るソーステーブルを作成します CREATE TABLE IF NOT EXISTS student_source ( id INT, name VARCHAR(256), age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -- ご利用の ApsaraDB RDS for MySQL インスタンスのエンドポイントに置き換えてください 'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '<RDS username>', 'password' = '<RDS password>', 'database-name' = '<RDS database>', -- ステップ 1 で作成したソーステーブルの名前に設定してください 'table-name' = 'student' ); -- DLF カタログにターゲットデータベースを作成します -- DLF カタログ名が異なる場合は、'dlf' を実際のカタログ名に置き換えてください CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb; -- DLF カタログに Apache Hudi の結果テーブルを作成します CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age BIGINT ) WITH ( 'connector' = 'hudi' );プレースホルダー 説明 例 rm-xxxxxxxx.mysql.rds.aliyuncs.comApsaraDB RDS for MySQL インスタンスのエンドポイント rm-bp1xxxxxxxx.mysql.rds.aliyuncs.com<RDS username>ソーステーブルへの読み取りアクセス権を持つ MySQL ユーザー admin<RDS password>MySQL ユーザーのパスワード — <RDS database>ソースデータベースの名前 testdbテーブルが作成されると、両方とも [Catalogs] の下に表示されます。

mysql-cdcコネクタパラメーターの完全なリストについては、「MySQL ソースコネクタ」をご参照ください。Hudi の結果テーブルのパラメーターについては、「Hudi コネクタ (廃止予定)」をご参照ください。
ストリーミングジョブの作成とデプロイ
左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。
[新規] をクリックし、[新規ドラフト] ダイアログボックスで [空白のストリームドラフト] を選択して、[次へ] をクリックします。
ドラフト構成を入力し、[作成] をクリックします。
SQL 編集エリアで、次の INSERT 文を入力します。
-- MySQL ソースから Hudi の結果テーブルに変更をストリーミングします INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source /*+ OPTIONS('server-id'='123456') */;SQL 編集エリアの右上隅にある [デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、必須フィールドを入力し、[確認] をクリックします。
ジョブの開始
左側のナビゲーションウィンドウで、[運用保守] > [デプロイメント] をクリックします。
対象のジョブの [アクション] 列で、[開始] をクリックします。
[初期モード] を選択し、[開始] をクリックします。
ジョブのステータスが [実行中] に変わると、Flink は MySQL からの変更をアクティブにキャプチャし、Hudi の結果テーブルに書き込みます。起動パラメーターの詳細については、「デプロイメントの開始」をご参照ください。
ステップ 4: データレイクの検証
左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。
次のクエリを実行して、初期行がデータレイクに書き込まれたことを確認します。
SELECT * FROM dlf.dlf_testdb.student_hudi;結果には、ステップ 1 で挿入された 2 つの行が表示されます。

DLF メタデータが有効になっている EMR クラスターがある場合は、EMR クラスターを介して Hudi テーブルにクエリを実行することもできます。詳細については、「Hudi と Spark SQL の統合」をご参照ください。
次のステップ
EMR DataFlow クラスターで Flink を使用して DLF 内の Hudi テーブルの読み書きを行う方法については、「Dataflow クラスターを使用して DLF ベースの Hudi テーブルのデータを読み書きする」をご参照ください。