すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:Flink と DLF を使用したデータレイクへのデータインポートとデータ分析

最終更新日:Mar 27, 2026

OLTP データベースが大規模になると、スループット向上のために複数のデータベースやテーブルにデータをシャーディングすることがよくありますが、これにより統一的な分析が困難になります。Data Lake Formation (DLF) は、Realtime Compute for Apache Flink (Ververica Platform (VVP) 上に構築) および Flink Change Data Capture (CDC) と統合することで、これらのデータをリアルタイムでデータレイクに統合します。その後、DLF は一元的なメタデータ管理を提供するため、E-MapReduce (EMR)、MaxCompute、Hologres などの複数の分析エンジンから、データを再移動することなく同じデータにクエリを実行できます。

このチュートリアルでは、Flink CDC を使用して MySQL データベースから変更をキャプチャし、それらを Apache Hudi の結果テーブルに書き込み、テーブルメタデータを DLF カタログに同期し、Flink SQL を使用してデータレイクにクエリを実行するまでの一連のエンドツーエンドのパイプラインを、Java や Scala のコードを一切記述することなく解説します。

仕組み

Architecture diagram showing MySQL → Flink CDC → Apache Hudi result table → DLF catalog → analytics engines

Realtime Compute for Apache Flink は、mysql-cdc コネクタを使用して MySQL ソースから変更イベントを読み取ります。Flink はこれらの変更を Apache Hudi の結果テーブルに書き込み、同時にテーブルスキーマを DLF カタログに同期します。メタデータが DLF に登録されると、Flink SQL コンソールや接続されたコンピュートエンジンから直接 Hudi テーブルにクエリを実行できます。また、DLF はデータレイクのライフサイクル管理とレイクフォーマットの最適化も処理し、長期にわたってデータのアクセシビリティとコスト効率を維持します。

前提条件

開始する前に、以下を確認してください。

  • Realtime Compute for Apache Flink が有効化されており、Flink 完全管理ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の開始」をご参照ください。

  • DLF が有効化されていること。まだ有効化されていない場合は、Data Lake Formation の製品ページに移動し、[今すぐ有効化] をクリックしてください。

  • ApsaraDB RDS for MySQL インスタンスが、Realtime Compute for Apache Flink ワークスペースと同じリージョンおよび VPC にあり、MySQL 5.7 以降を実行していること。設定手順については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。別のソースデータベースを使用する場合は、この項目をスキップしてください。

ステップ 1: MySQL データの準備

  1. ApsaraDB RDS for MySQL インスタンスにログインします。詳細については、「ApsaraDB RDS for MySQL インスタンスへの接続」をご参照ください。

  2. 次の SQL を実行して、テストデータベースとテーブルを作成し、サンプルデータを挿入します。

    CREATE DATABASE testdb;
    CREATE TABLE testdb.student (
      `id` bigint(20) NOT NULL,
      `name` varchar(256) DEFAULT NULL,
      `age` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    INSERT INTO testdb.student VALUES (1,'name1',10);
    INSERT INTO testdb.student VALUES (2,'name2',20);

ステップ 2: Flink での DLF カタログの作成

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. [Flink 完全管理] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで [Catalogs] をクリックし、次に [Create Catalog] をクリックします。

  4. [カタログの作成]」ページで、[DLF] を選択し、「[次へ]」をクリックします。

  5. カタログ構成を入力し、[確認] をクリックします。パラメーターの詳細については、「DLF カタログの管理」をご参照ください。

    Create Catalog configuration page

カタログが作成されると、[Catalogs] の下に dlf として表示されます。これは DLF のデフォルトのデータカタログです。

Catalogs pane showing the dlf catalog

ステップ 3: Flink データレイクジョブの作成

ソーステーブルと結果テーブルの作成

  1. 左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。

  2. SQL 編集エリアで、次の SQL を入力し、[実行] をクリックします。

    -- MySQL から変更イベントを読み取るソーステーブルを作成します
    CREATE TABLE IF NOT EXISTS student_source (
      id INT,
      name VARCHAR(256),
      age INT,
      PRIMARY KEY (id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mysql-cdc',
      -- ご利用の ApsaraDB RDS for MySQL インスタンスのエンドポイントに置き換えてください
      'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = '<RDS username>',
      'password' = '<RDS password>',
      'database-name' = '<RDS database>',
      -- ステップ 1 で作成したソーステーブルの名前に設定してください
      'table-name' = 'student'
    );
    
    -- DLF カタログにターゲットデータベースを作成します
    -- DLF カタログ名が異なる場合は、'dlf' を実際のカタログ名に置き換えてください
    CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb;
    
    -- DLF カタログに Apache Hudi の結果テーブルを作成します
    CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi (
      id    BIGINT PRIMARY KEY NOT ENFORCED,
      name  STRING,
      age   BIGINT
    ) WITH (
      'connector' = 'hudi'
    );
    プレースホルダー説明
    rm-xxxxxxxx.mysql.rds.aliyuncs.comApsaraDB RDS for MySQL インスタンスのエンドポイントrm-bp1xxxxxxxx.mysql.rds.aliyuncs.com
    <RDS username>ソーステーブルへの読み取りアクセス権を持つ MySQL ユーザーadmin
    <RDS password>MySQL ユーザーのパスワード
    <RDS database>ソースデータベースの名前testdb

    テーブルが作成されると、両方とも [Catalogs] の下に表示されます。

    Catalogs pane showing the newly created source and result tables

    mysql-cdc コネクタパラメーターの完全なリストについては、「MySQL ソースコネクタ」をご参照ください。Hudi の結果テーブルのパラメーターについては、「Hudi コネクタ (廃止予定)」をご参照ください。

ストリーミングジョブの作成とデプロイ

  1. 左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。

  2. [新規] をクリックし、[新規ドラフト] ダイアログボックスで [空白のストリームドラフト] を選択して、[次へ] をクリックします。

  3. ドラフト構成を入力し、[作成] をクリックします。

  4. SQL 編集エリアで、次の INSERT 文を入力します。

    -- MySQL ソースから Hudi の結果テーブルに変更をストリーミングします
    INSERT INTO dlf.dlf_testdb.student_hudi
    SELECT * FROM student_source /*+ OPTIONS('server-id'='123456') */;
  5. SQL 編集エリアの右上隅にある [デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、必須フィールドを入力し、[確認] をクリックします。

ジョブの開始

  1. 左側のナビゲーションウィンドウで、[運用保守] > [デプロイメント] をクリックします。

  2. 対象のジョブの [アクション] 列で、[開始] をクリックします。

  3. [初期モード] を選択し、[開始] をクリックします。

ジョブのステータスが [実行中] に変わると、Flink は MySQL からの変更をアクティブにキャプチャし、Hudi の結果テーブルに書き込みます。起動パラメーターの詳細については、「デプロイメントの開始」をご参照ください。

ステップ 4: データレイクの検証

  1. 左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。

  2. 次のクエリを実行して、初期行がデータレイクに書き込まれたことを確認します。

    SELECT * FROM dlf.dlf_testdb.student_hudi;

    結果には、ステップ 1 で挿入された 2 つの行が表示されます。

    Query results showing the two initial rows

DLF メタデータが有効になっている EMR クラスターがある場合は、EMR クラスターを介して Hudi テーブルにクエリを実行することもできます。詳細については、「Hudi と Spark SQL の統合」をご参照ください。

次のステップ

EMR DataFlow クラスターで Flink を使用して DLF 内の Hudi テーブルの読み書きを行う方法については、「Dataflow クラスターを使用して DLF ベースの Hudi テーブルのデータを読み書きする」をご参照ください。