Data Lake Formation(DLF)は、Ververica Platform(VVP)コンピューティングエンジン上に構築された Realtime Compute for Apache Flink サービス、および Flink Change Data Capture(CDC)テクノロジーと連携して、データレイクにデータをインポートできます。ニーズに合わせてデータインポートパラメーターをカスタマイズできます。 DLF は、一元化されたメタデータ管理機能と権限管理機能を提供します。複数の分析エンジンをサポートし、データレイクのデータ価値を探ることができます。このトピックでは、Realtime Compute for Apache Flink と DLF を使用してデータレイクにデータをインポートし、データを分析する方法について説明します。
背景情報
Alibaba Cloud Realtime Compute for Apache Flink は、Apache Flink 上に構築されたリアルタイムビッグデータ分析プラットフォームです。このプラットフォームは、複数の種類のデータソースと結果テーブルをサポートしています。データレイクは、さまざまな種類のデータを保存するための一元化されたリポジトリです。 Flink タスクを作成して、Hudi または Iceberg の結果テーブルからデータレイクにデータを抽出し、一元的に保存および分析できます。データレイクにデータをインポートする際に、DLF カタログを構成してテーブルメタデータを DLF に同期できます。 DLF は、データレイクのメタデータを一元的に管理できるエンタープライズレベルのサービスです。 Realtime Compute for Apache Flink と DLF を連携させることで、データレイク内のテーブルを E-MapReduce(EMR)、MaxCompute、Hologres などの Alibaba Cloud コンピューティングエンジンにシームレスに接続できます。 DLF は、データレイクのライフサイクル管理やレイク形式の最適化など、幅広いデータレイク管理機能も提供します。
前提条件
Realtime Compute for Apache Flink サービスがアクティブ化されており、フルマネージド Flink ワークスペースが作成されている。
DLF サービスがアクティブ化されている。アクティブ化していない場合は、Data Lake Formation にアクセスし、[今すぐアクティブ化] をクリックします。
このトピックでは、MySQL データソースを例として使用します。 RDS MySQL インスタンスを作成する必要があります。詳細については、「手順 1: ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する」をご参照ください。他のデータソースを使用する場合は、この前提条件は無視してください。
ApsaraDB RDS for MySQL インスタンスは、Realtime Compute for Apache Flink と同じリージョンおよび VPC に存在する必要があります。 ApsaraDB RDS for MySQL のエンジンバージョンは 5.7 以降である必要があります。
手順
手順 1: MySQL データを準備する
準備した MySQL インスタンスにログオンします。詳細については、「手順 2: ApsaraDB RDS for MySQL インスタンスに接続する」をご参照ください。
次のコマンドを実行して、テーブルを作成し、テストデータをテーブルに挿入します。
CREATE DATABASE testdb; CREATE TABLE testdb.student ( `id` bigint(20) NOT NULL, `name` varchar(256) DEFAULT NULL, `age` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ); INSERT INTO testdb.student VALUES (1,'name1',10); INSERT INTO testdb.student VALUES (2,'name2',20); /* テーブルを作成し、テストデータを挿入します */
手順 2: Flink で DLF カタログを作成する
Realtime Compute for Apache Flink コンソール にログオンします。Realtime Compute for Apache Flink コンソール
[カタログの作成] ダイアログボックスに移動します。
[フルマネージド Flink] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。
左側のナビゲーションペインで、[カタログ] をクリックします。
[カタログの作成] をクリックします。
DLF カタログを作成します。
[カタログの作成] ページで、[DLF] を選択し、[次へ] をクリックします。
次の構成情報を入力し、[確認] をクリックします。詳細については、「DLF カタログの管理」をご参照ください。

DLF を正常に作成した後、[カタログ] に新しく追加された dlf データカタログが表示されます。リンクは DLF のデフォルトのデータカタログです。

手順 3: Flink データレイクジョブを作成する
Realtime Compute for Apache Flink コンソール にログオンします。
[フルマネージド Flink] タブで、対象のワークスペースの [アクション] 列にある [コンソール] をクリックします。
ソーステーブルとターゲットテーブルを作成します。
左側のナビゲーションペインで、をクリックします。
SQL 編集領域に次のコードを入力し、[実行]をクリックします。
-- ソーステーブルを作成します CREATE TABLE IF NOT EXISTS student_source ( id INT, name VARCHAR (256), age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -- hostname を ApsaraDB RDS for MySQL インスタンスのエンドポイントに置き換えます 'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '<RDS ユーザー名>', 'password' = '<RDS パスワード>', 'database-name' = '<RDS データベース>', -- table-name パラメーターをソーステーブルの名前に設定します。この例では、ソーステーブルは手順 2 で作成した student テーブルです。 'table-name' = 'student' ); -- カタログ名は、手順 2 で作成した dlf カタログ名です。この例では、カタログ名は dlf です CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb; -- デスティネーションテーブルを作成します CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age BIGINT ) WITH( 'connector' = 'hudi' );テーブルが作成されると、[カタログ] に新しく追加されたソーステーブルとターゲットテーブルが表示されます。

Flink SQL データレイクジョブを作成します。
左側のナビゲーションペインで、 をクリックします。
[新規]をクリックし、[新規ドラフト] ダイアログボックスで [空のストリームドラフト] を選択し、[次へ]をクリックします。
[ドラフト構成]を入力し、[作成]をクリックします。
SQL 編集領域に次のコードを入力して、Flink SQL ジョブを作成します。
-- ストリーム SQL ジョブを作成します INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source /*+ OPTIONS('server-id'='123456') */; /* student_source テーブルのデータを student_hudi テーブルに挿入します */説明MySQL ソーステーブルのパラメーター設定と使用条件の詳細については、「MySQL コネクタ」をご参照ください。
Hudi 結果テーブルのパラメーター設定の詳細については、「Hudi コネクタ (廃止予定)」をご参照ください。
SQL 編集領域の右上隅にある [デプロイ]をクリックします。[ドラフトのデプロイ] ダイアログボックスで、必要な情報を入力または選択し、[確認]をクリックします。
ジョブを開始します。
左側のナビゲーションペインで、 をクリックします。
対象のジョブの [アクション] 列にある [開始]をクリックします。
[初期モード]を選択し、[開始]をクリックします。ジョブステータスが [実行中]に変わると、ジョブは正常に実行されています。ジョブ起動パラメーター構成の詳細については、「デプロイメントの開始」をご参照ください。
手順 4: データレイク分析に DLF を使用する
左側のナビゲーションバーで、をクリックします。
SQL 編集領域に次のコードを入力し、[実行]をクリックします。
SELECT * FROM dlf.dlf_testdb.student_hudi; /* student_hudi テーブルのデータを取得します */クエリ結果は次の図に示されています。 Flink によってデータレイクに書き込まれたデータを直接クエリして分析できます。

EMR クラスターを購入し、データレイクの DLF メタデータを有効にしている場合は、EMR クラスターを介して Flink のデータレイク結果を直接分析することもできます。詳細については、「Hudi と Spark SQL を統合する」をご参照ください。
関連情報
EMR DataFlow クラスターで Flink を介して DLF の読み書きを行う場合は、「Dataflow クラスターを DLF に接続して Hudi からデータを読み書きする」をご参照ください。