This topic describes how to use Realtime Compute for Apache Flink and E-MapReduce (EMR) Serverless Spark to build a Paimon data lake analytics process. The process allows you to write data to Object Storage Service (OSS), perform interactive queries, and compact offline data. EMR Serverless Spark is fully compatible with Paimon. EMR Serverless Spark integrates Data Lake Formation (DLF), which enables metadata sharing with other cloud services, such as Realtime Compute for Apache Flink. This helps formulate a solution for unified batch and stream computing. You can run jobs and configure job parameters in a flexible manner in EMR Serverless Spark to meet various requirements for real-time analysis and job scheduling.
Background information
Realtime Compute for Apache Flink
Alibaba Cloud Realtime Compute for Apache Flink is a fully managed, out-of-the-box serverless Flink service that supports end-to-end development, O&M, and management. Realtime Compute for Apache Flink supports multiple billing methods. Realtime Compute for Apache Flink also delivers powerful capabilities for entire project lifecycles, including draft development, data debugging, operation and monitoring, automatic tuning, and intelligent diagnostics. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?
Apache Paimon
Apache Paimon is a unified data lake format. Apache Paimon can work together with Apache Flink and Apache Spark to build a real-time lakehouse architecture that supports unified batch and stream computing. Apache Paimon innovatively combines the lake format with the log-structured merge-tree (LSM) technology to support real-time stream updates and stream computing. For more information, see Apache Paimon.
Procedure
Step 1: Create a Paimon catalog in Realtime Compute for Apache Flink
Apache Paimon catalogs can be used to manage all Apache Paimon tables in the same warehouse directory in an efficient manner. Apache Paimon catalogs can also be used by other Alibaba Cloud services. For information about how to create and use an Apache Paimon catalog, see Manage Apache Paimon catalogs.
Log on to the management console of Realtime Compute for Apache Flink.
Find the desired workspace and click Console in the Actions column.
Create an Apache Paimon catalog.
In the left-side navigation pane, choose Development > Scripts.
On the Scripts tab, click the
icon to create a script.
Enter the SQL code in the script editor.
Sample code:
CREATE CATALOG `paimon` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>', );
Parameter
Description
Required
Remarks
paimon
The name of the Apache Paimon catalog.
Yes
Enter a custom name.
type
The type of the catalog.
Yes
Set the value to paimon.
metastore
The metadata storage type.
Yes
Set the value to dlf. You can use DLF to manage metadata in a centralized manner and implement seamless integration among engines.
warehouse
The data warehouse directory.
Yes
Configure this parameter based on your business requirements.
dlf.catalog.id
The ID of the DLF data catalog.
Yes
You can view the ID of the data catalog in the DLF console.
dlf.catalog.accessKeyId
The AccessKey ID that is used to access DLF.
Yes
For information about how to obtain the AccessKey pair, see Create an AccessKey pair.
dlf.catalog.accessKeySecret
The AccessKey secret that is used to access DLF.
Yes
For information about how to obtain the AccessKey pair, see Create an AccessKey pair.
dlf.catalog.endpoint
The endpoint of DLF.
Yes
For more information, see Supported regions and endpoints.
NoteIf DLF resides in the same region as Realtime Compute for Apache Flink, the VPC endpoint is used. Otherwise, the public endpoint is used.
dlf.catalog.region
The region in which DLF resides.
Yes
For more information, see Supported regions and endpoints.
NoteMake sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.
Select or create a session cluster.
In the lower-right corner of the script editing page, select a session cluster from the Environment drop-down list. You can select a session cluster that uses Ververica Runtime (VVR) 8.0.4 or later. If no session cluster is available, create a session cluster. For information about how to create a session cluster, see the Step 1: Create a session cluster section of the "Debug a deployment" topic.
Select the code that you want to run and click Run to the left of the code.
Create an Apache Paimon table.
In the script editor, enter and select the following code. Then, click Run in the upper-left corner of the script editor.
CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl` ( id STRING, data STRING, category INT, ts STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true' );
Create a streaming draft.
Create a streaming draft.
In the left-side navigation pane, choose Development > ETL.
On the Drafts tab, click the New (Create Draft) icon.
On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft.
Click Next.
In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
Location
The folder in which the code file of the draft is saved.
You can also click the
icon to the right of an existing folder to create a subfolder.
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
Click Create.
Write code.
In the draft editor, enter the following code to continuously generate data by using Datagen and write data to the Paimon table. Sample code:
CREATE TEMPORARY TABLE datagen ( id string, data string, category int ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.category.kind' = 'random', 'fields.category.min' = '1', 'fields.category.max' = '10' ); INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl` SELECT id, data, category, cast(LOCALTIMESTAMP as string) as ts, cast(CURRENT_DATE as string) as dt, cast(hour(LOCALTIMESTAMP) as string) as hh FROM datagen;
In the upper-right corner of the toolbar, click Deploy to deploy the draft to the production environment.
Start the deployment on the Deployments page. For more information, see Start a deployment.
Step 2: Create an SQL session in EMR Serverless Spark
You must create an SQL session for SQL development and queries. For more information about sessions, see Manage sessions.
Go to the Compute page.
Log on to the EMR console.
In the left-side navigation pane, choose
.On the Spark page, click the name of the workspace that you want to manage.
In the left-side navigation pane of the EMR Serverless Spark page, click Compute.
Create an SQL session.
On the SQL Sessions tab, click Create SQL Session.
On the Create SQL Session page, configure parameters and click Create. The following table describes the parameters.
Parameter
Description
Name
The name of the SQL session. Example: paimon_compute.
Spark Configuration
Enter the following code to connect to Paimon:
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
Configure the following parameters based on your business requirements:
<warehouse>
: the data warehouse directory.<dlf.catalog.id>
: The ID of the DLF data catalog.
Click Start in the Actions column.
Step 3: Perform interactive queries and schedule jobs in EMR Serverless Spark
EMR Serverless Spark allows you to perform interactive queries and schedule jobs in a workflow to meet different requirements. You can perform interactive queries to implement fast queries and debugging. You can also create a workflow to develop, publish, and maintain jobs. This helps you implement complete lifecycle management of jobs.
During the data writing process, you can use EMR Serverless Spark to perform interactive queries on the Paimon table at any time. This helps you obtain the status of data in real time and perform fast analysis on data. You can publish developed jobs and create a workflow based on the jobs to orchestrate the jobs and publish the workflow. You can configure scheduling policies to schedule jobs on a regular basis. This way, data can be processed and analyzed in an automated and efficient manner.
Interactive query
Create an SQL job.
In the left-side navigation pane of the EMR Serverless Spark page, click Data Development.
On the Development tab, click Create.
In the Create dialog box, configure the Name and Type parameters and click OK. In this example, the Name parameter is set to paimon_compact and the Type parameter is set to SparkSQL.
In the upper-right corner of the toolbar, select a data catalog from the Default Catalog drop-down list, a database from the Default Database drop-down list, and the started SQL compute from the SQL Sessions drop-down list.
Enter SQL statements in the editor of the created job.
Example 1: Query the first 10 rows of data from the
test_append_tbl
table.SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;
The following figure shows the results.
Example 2: Count the number of rows that meet specific conditions in the
test_append_tbl
table.SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';
The following figure shows the results.
Run and publish the job.
Click Run.
You can view the results on the Execution Results tab in the lower part of the page. If an exception occurs, you can view the exception on the Execution Issues tab in the lower part of the page.
Confirm that the job runs as expected. Then, in the upper-right corner of the page, click Publish to publish the job.
In the Publish dialog box, configure the Remarks parameter and click OK.
Job scheduling
Query information about the files before they are compacted.
On the Development tab, create an SQL job to query the system table files of Paimon. This way, you can obtain information about the files before they are compacted. For information about how to create an SQL job, see Develop a SQL job.
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';
On the Development tab, create another SQL job named paimon_compact. Enter SQL statements for the compaction operation in the editor of the created job.
For information about how to create an SQL job, see Develop a SQL job.
CALL paimon.sys.compact ( table => 'test_paimon_db.test_append_tbl', partitions => 'dt=\"2024-06-24\",hh=\"19\"', order_strategy => 'zorder', order_by => 'category' );
Create a workflow.
In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Workflows.
On the Workflows tab, click Create Workflow.
In the Create Workflow panel, configure the Name parameter and click Next. In this example, the Name parameter is set to paimon_workflow_task.
You can configure the parameters in the Other Settings section based on your business requirements. For more information about parameter settings, see Manage workflows.
On the canvas that appears, click Add Node.
In the Add Node panel, select the job paimon_compact from the Source File Path drop-down list, configure the Spark Configuration parameter, and then click Save.
Parameter
Description
Name
The name of the SQL session. Example: paimon_compute.
Spark Configuration
Enter the following code to connect to Paimon:
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
Configure the following parameters based on your business requirements:
<warehouse>
: the data warehouse directory.<dlf.catalog.id>
: The ID of the DLF data catalog.
In the upper-right corner of the canvas, click Publish Workflow. In the Publish dialog box, click OK.
Run the workflow.
On the Workflows tab, find the workflow paimon_workflow_task and click the name in the Name column.
In the upper-right corner of the Workflow Runs tab, click Manually Run.
In the Run message, click OK.
Verify the compaction effect.
After the workflow is successfully run, execute the first SQL job again to compare the number of files, the number of records, and the file size before and after the compaction operation.
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';