Data Lake Formation (DLF) can work with the Realtime Compute for Apache Flink service built on the Ververica Platform (VVP) computing engine and the Flink Change Data Capture (CDC) technology to import data to data lakes. You can customize the data import parameters based on your needs. DLF provides centralized metadata management and permission management capabilities. It supports multiple analytics engines to explore the data value of data lakes. This topic describes how to use Realtime Compute for Apache Flink and DLF to import data to data lakes and analyze the data.
Background information
Alibaba Cloud Realtime Compute for Apache Flink is a real-time big data analytics platform that is built on Apache Flink. The platform supports multiple types of data sources and result tables. A data lake is a centralized repository for storing various types of data. You can create Flink tasks to extract data from Hudi or Iceberg result tables to data lakes for unified storage and analysis. When you import data to data lakes, you can configure a DLF catalog to synchronize the table metadata to DLF. DLF is an enterprise-level service that allows you to manage the metadata of data lakes in a centralized manner. Realtime Compute for Apache Flink and DLF can work together to ensure that tables in data lakes seamlessly connect to Alibaba Cloud compute engines such as E-MapReduce (EMR), MaxCompute, and Hologres. DLF also provides a wide range of data lake management capabilities, such as data lake lifecycle management and lake format optimization.
Prerequisites
The Realtime Compute for Apache Flink service is activated, and a fully managed Flink workspace is created.
The DLF service is activated. If you have not activated it, you can go to the Data Lake Formation and click Activate Now.
This topic uses a MySQL data source as an example. You need to create an RDS MySQL instance. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure databases. If you use other data sources, ignore this prerequisite.
The ApsaraDB RDS for MySQL instance must be in the same region and VPC as the Realtime Compute for Apache Flink. The engine version of the ApsaraDB RDS for MySQL must be 5.7 or later.
Procedure
Step 1: Prepare MySQL data
Log on to the prepared MySQL instance. For more information, see Step 2: Connect to an ApsaraDB RDS for MySQL instance.
Run the following commands to create a table and insert test data into the table.
CREATE DATABASE testdb; CREATE TABLE testdb.student ( `id` bigint(20) NOT NULL, `name` varchar(256) DEFAULT NULL, `age` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ); INSERT INTO testdb.student VALUES (1,'name1',10); INSERT INTO testdb.student VALUES (2,'name2',20);
Step 2: Create a DLF catalog in Flink
Log on to the Realtime Compute for Apache Flink console.Realtime Compute for Apache Flink console
Go to the Create Catalog dialog box.
On the Fully Managed Flink tab, click Console in the Actions column of the target workspace.
In the left-side navigation pane, click Catalogs.
Click Create Catalog.
Create a DLF catalog.
On the Create Catalog page, select DLF and click Next.
Enter the following configuration information and click Confirm. For more information, see Manage DLF catalogs.

After you successfully create the DLF, you can see the newly added dlf data catalog in Catalogs. The link is the default data catalog of DLF.

Step 3: Create a Flink data lake job
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, click Console in the Actions column of the target workspace.
Create a source table and a target table.
In the left-side navigation pane, click .
In the SQL editing area, enter the following code and click Run.
-- Create a source table CREATE TABLE IF NOT EXISTS student_source ( id INT, name VARCHAR (256), age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -- Replace hostname with the endpoint of the ApsaraDB RDS for MySQL instance 'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '<RDS user name>', 'password' = '<RDS password>', 'database-name' = '<RDS database>', -- Set the table-name parameter to the name of the source table. In this example, the source table is the student table created in Step 2 'table-name' = 'student' ); -- The catalog name is the dlf catalog name created in Step 2. In this example, the catalog name is dlf CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb; -- Create a destination table CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age BIGINT ) WITH( 'connector' = 'hudi' );After the tables are created, you can see the newly added source table and target table in Catalogs.

Create a Flink SQL data lake job.
In the left-side navigation pane, click .
Click New, then in the New Draft dialog box, select Blank Stream Draft, and click Next.
Enter Draft Configuration, and click Create.
In the SQL editing area, enter the following code to create a Flink SQL job.
-- Create a stream SQL job INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source /*+ OPTIONS('server-id'='123456') */;NoteFor more information about the parameter settings and usage conditions of the MySQL source table, see MySQL.
For more information about the parameter settings of the Hudi result table, see Hudi connector (to be retired).
In the upper-right corner of the SQL editing area, click Deploy. In the Deploy draft dialog box, enter or select the required information, and click Confirm.
Start the job.
In the left-side navigation pane, click .
Click Start in the Actions column of the target job.
Select Initial Mode, and click Start. When the job status changes to RUNNING, the job is running properly. For more information about job startup parameter configuration, see Start a deployment.
Step 4: Use DLF for data lake analysis
In the left navigation bar, click .
In the SQL editing area, enter the following code and click Run.
SELECT * FROM dlf.dlf_testdb.student_hudi;The query results are shown in the following figure. You can directly query and analyze the data written to the data lake by Flink.

If you have purchased an EMR cluster and enabled DLF metadata for the data lake, you can also directly analyze the data lake results of Flink through the EMR cluster. For more information, see Integrate Hudi with Spark SQL.
References
If you want to read and write DLF through Flink in an EMR DataFlow cluster, see Use a Dataflow cluster to read data from and write data to Hudi tables based on DLF.