すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:Realtime Compute for Apache Flink と DLF を使用してデータレイクにデータをインポートし、データを分析する

最終更新日:Apr 15, 2025

Data Lake Formation(DLF)は、Ververica Platform(VVP)コンピューティングエンジン上に構築された Realtime Compute for Apache Flink サービス、および Flink Change Data Capture(CDC)テクノロジーと連携して、データレイクにデータをインポートできます。ニーズに合わせてデータインポートパラメーターをカスタマイズできます。 DLF は、一元化されたメタデータ管理機能と権限管理機能を提供します。複数の分析エンジンをサポートし、データレイクのデータ価値を探ることができます。このトピックでは、Realtime Compute for Apache Flink と DLF を使用してデータレイクにデータをインポートし、データを分析する方法について説明します。

背景情報

Alibaba Cloud Realtime Compute for Apache Flink は、Apache Flink 上に構築されたリアルタイムビッグデータ分析プラットフォームです。このプラットフォームは、複数の種類のデータソースと結果テーブルをサポートしています。データレイクは、さまざまな種類のデータを保存するための一元化されたリポジトリです。 Flink タスクを作成して、Hudi または Iceberg の結果テーブルからデータレイクにデータを抽出し、一元的に保存および分析できます。データレイクにデータをインポートする際に、DLF カタログを構成してテーブルメタデータを DLF に同期できます。 DLF は、データレイクのメタデータを一元的に管理できるエンタープライズレベルのサービスです。 Realtime Compute for Apache Flink と DLF を連携させることで、データレイク内のテーブルを E-MapReduce(EMR)、MaxCompute、Hologres などの Alibaba Cloud コンピューティングエンジンにシームレスに接続できます。 DLF は、データレイクのライフサイクル管理やレイク形式の最適化など、幅広いデータレイク管理機能も提供します。

image

前提条件

  • Realtime Compute for Apache Flink サービスがアクティブ化されており、フルマネージド Flink ワークスペースが作成されている。

  • DLF サービスがアクティブ化されている。アクティブ化していない場合は、Data Lake Formation にアクセスし、[今すぐアクティブ化] をクリックします。

  • このトピックでは、MySQL データソースを例として使用します。 RDS MySQL インスタンスを作成する必要があります。詳細については、「手順 1: ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する」をご参照ください。他のデータソースを使用する場合は、この前提条件は無視してください。

重要

ApsaraDB RDS for MySQL インスタンスは、Realtime Compute for Apache Flink と同じリージョンおよび VPC に存在する必要があります。 ApsaraDB RDS for MySQL のエンジンバージョンは 5.7 以降である必要があります。

手順

手順 1: MySQL データを準備する

  1. 準備した MySQL インスタンスにログオンします。詳細については、「手順 2: ApsaraDB RDS for MySQL インスタンスに接続する」をご参照ください。

  2. 次のコマンドを実行して、テーブルを作成し、テストデータをテーブルに挿入します。

    CREATE DATABASE testdb;
    CREATE TABLE testdb.student (
      `id` bigint(20) NOT NULL,
      `name` varchar(256) DEFAULT NULL,
      `age` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    INSERT INTO testdb.student VALUES (1,'name1',10);
    INSERT INTO testdb.student VALUES (2,'name2',20);
    /* テーブルを作成し、テストデータを挿入します */

手順 2: Flink で DLF カタログを作成する

  1. Realtime Compute for Apache Flink コンソール にログオンします。Realtime Compute for Apache Flink コンソール

  2. [カタログの作成] ダイアログボックスに移動します。

    1. [フルマネージド Flink] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

    2. 左側のナビゲーションペインで、[カタログ] をクリックします。

    3. [カタログの作成] をクリックします。

  3. DLF カタログを作成します。

    1. [カタログの作成] ページで、[DLF] を選択し、[次へ] をクリックします。

    2. 次の構成情報を入力し、[確認] をクリックします。詳細については、「DLF カタログの管理」をご参照ください。

    image

    DLF を正常に作成した後、[カタログ] に新しく追加された dlf データカタログが表示されます。リンクは DLF のデフォルトのデータカタログです。

    image

手順 3: Flink データレイクジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログオンします。

  2. [フルマネージド Flink] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。

  3. ソーステーブルとターゲットテーブルを作成します。

    1. 左側のナビゲーションペインで、[開発] > [スクリプト]をクリックします。

    2. SQL 編集領域に次のコードを入力し、[実行]をクリックします。

      -- ソーステーブルを作成します
      CREATE TABLE IF NOT EXISTS student_source (
        id INT,
        name VARCHAR (256),
        age INT,
        PRIMARY KEY (id) NOT ENFORCED
      )
      WITH (
        'connector' = 'mysql-cdc',
        -- hostname を ApsaraDB RDS for MySQL インスタンスのエンドポイントに置き換えます
        'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = '<RDS ユーザー名>',
        'password' = '<RDS パスワード>',
        'database-name' = '<RDS データベース>',
        -- table-name パラメーターをソーステーブルの名前に設定します。この例では、ソーステーブルは手順 2 で作成した student テーブルです。
        'table-name' = 'student'
      );
      
      -- カタログ名は、手順 2 で作成した dlf カタログ名です。この例では、カタログ名は dlf です
      CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb;
      
      -- デスティネーションテーブルを作成します
      CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi (
        id    BIGINT PRIMARY KEY NOT ENFORCED,
        name  STRING,
        age    BIGINT
      ) WITH(
        'connector' = 'hudi'
      );
      

      テーブルが作成されると、[カタログ] に新しく追加されたソーステーブルとターゲットテーブルが表示されます。

      image

  4. Flink SQL データレイクジョブを作成します。

    1. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。

    2. [新規]をクリックし、[新規ドラフト] ダイアログボックスで [空のストリームドラフト] を選択し、[次へ]をクリックします。

    3. [ドラフト構成]を入力し、[作成]をクリックします。

    4. SQL 編集領域に次のコードを入力して、Flink SQL ジョブを作成します。

      -- ストリーム SQL ジョブを作成します
      INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source  /*+ OPTIONS('server-id'='123456') */;
      /* student_source テーブルのデータを student_hudi テーブルに挿入します */
      
      説明
      • MySQL ソーステーブルのパラメーター設定と使用条件の詳細については、「MySQL コネクタ」をご参照ください。

      • Hudi 結果テーブルのパラメーター設定の詳細については、「Hudi コネクタ (廃止予定)」をご参照ください。

    5. SQL 編集領域の右上隅にある [デプロイ]をクリックします。[ドラフトのデプロイ] ダイアログボックスで、必要な情報を入力または選択し、[確認]をクリックします。

  5. ジョブを開始します。

    1. 左側のナビゲーションペインで、[O&M] > [デプロイメント] をクリックします。

    2. 対象のジョブの [アクション] 列にある [開始]をクリックします。

      [初期モード]を選択し、[開始]をクリックします。ジョブステータスが [実行中]に変わると、ジョブは正常に実行されています。ジョブ起動パラメーター構成の詳細については、「デプロイメントの開始」をご参照ください。

手順 4: データレイク分析に DLF を使用する

  1. 左側のナビゲーションバーで、[開発] > [スクリプト]をクリックします。

  2. SQL 編集領域に次のコードを入力し、[実行]をクリックします。

    SELECT * FROM dlf.dlf_testdb.student_hudi;
    /* student_hudi テーブルのデータを取得します */
    

    クエリ結果は次の図に示されています。 Flink によってデータレイクに書き込まれたデータを直接クエリして分析できます

    image

説明

EMR クラスターを購入し、データレイクの DLF メタデータを有効にしている場合は、EMR クラスターを介して Flink のデータレイク結果を直接分析することもできます。詳細については、「Hudi と Spark SQL を統合する」をご参照ください。

関連情報

EMR DataFlow クラスターで Flink を介して DLF の読み書きを行う場合は、「Dataflow クラスターを DLF に接続して Hudi からデータを読み書きする」をご参照ください。