All Products
Search
Document Center

E-MapReduce:Build a unified stream and batch data lake for analytics using Flink, EMR Serverless Spark, and Paimon

Last Updated:Sep 20, 2025

This topic demonstrates how to build a Paimon data lake analytics pipeline using Realtime Compute for Apache Flink and EMR Serverless Spark. The pipeline includes writing data to OSS, running interactive queries, and performing offline data Compact operations. EMR Serverless Spark is fully compatible with Paimon. It uses built-in DLF metadata to interoperate with other Alibaba Cloud services, such as Realtime Compute for Apache Flink, to create a complete unified stream and batch processing solution. It supports flexible job execution methods and parameter configurations to meet various needs for real-time analytics and production scheduling.

Background information

Realtime Compute for Apache Flink

Alibaba Cloud Realtime Compute for Apache Flink is a fully managed, serverless Flink cloud service. It is a one-stop, ready-to-use platform for development, operations and maintenance (O&M), and management that offers flexible billing. It provides full lifecycle capabilities, including job development, data debugging, running and monitoring, automatic tuning, and intelligent diagnostics. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?.

Apache Paimon

Apache Paimon is a unified data lake storage format. It combines Flink and Spark to build a real-time data lakehouse architecture for stream and batch processing. Paimon innovatively combines the lake format with Log-structured merge-tree (LSM) technology. This provides the data lake with real-time stream update capabilities and complete stream processing power. For more information, see Apache Paimon.

Procedure

Step 1: Create a Paimon Catalog using Realtime Compute for Apache Flink

A Paimon Catalog lets you easily manage all Paimon tables in the same warehouse directory and connect with other Alibaba Cloud services. To create and use a Paimon Catalog, see Manage Paimon Catalogs.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. Create a Paimon Catalog.

    1. In the navigation pane on the left, choose Data Development > Data Query.

    2. Create a query script and enter the SQL code.

      The following code provides the complete Catalog configuration.

      CREATE CATALOG `paimon` WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'warehouse' = '<warehouse>',
        'dlf.catalog.id' = '<dlf.catalog.id>',
        'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
        'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
        'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
        'dlf.catalog.region' = '<dlf.catalog.region>',
      );

      Configuration Item

      Description

      Required

      Notes

      paimon

      The name of the Paimon Catalog.

      Yes

      Enter a custom name in English.

      type

      The type of the Catalog.

      Yes

      Set the value to paimon.

      metastore

      The type of the metastore.

      Yes

      In this example, the metastore type is set to dlf. DLF is used for unified metadata management to ensure seamless integration across multiple engines.

      warehouse

      The actual location of the data warehouse.

      Yes

      Modify this value as needed.

      dlf.catalog.id

      The ID of the DLF data catalog.

      Yes

      View the data catalog ID in the Data Lake Formation console.

      dlf.catalog.accessKeyId

      The AccessKey ID required to access the DLF service.

      Yes

      For more information about how to obtain an AccessKey ID, see Create an AccessKey.

      dlf.catalog.accessKeySecret

      The AccessKey secret required to access the DLF service.

      Yes

      For more information about how to obtain an AccessKey secret, see Create an AccessKey.

      dlf.catalog.endpoint

      The endpoint of the DLF service.

      Yes

      For more information, see Regions and endpoints.

      Note

      If Flink and DLF are in the same region, use the VPC endpoint. Otherwise, use the public endpoint.

      dlf.catalog.region

      The region where DLF is located.

      Yes

      For more information, see Regions and endpoints.

      Note

      Make sure this region is the same as the one selected for dlf.catalog.endpoint.

    3. Select or create a session cluster.

      In the lower-right corner of the page, click Execution Environment and select a session cluster of the required version. The engine version must be Ververica Runtime (VVR) 8.0.4 or later. If you do not have a session cluster, see Step 1: Create a session cluster.

    4. Select the target code snippet and click Run on the left of the code line.

  4. Create a Paimon table.

    In the Query Script editor, enter the following command. Then, select the code and click Run.

    CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl`
    (
        id       STRING,
        data     STRING,
        category INT,
        ts       STRING,
        dt       STRING,
        hh       STRING
    ) PARTITIONED BY (dt, hh)
    WITH (
        'write-only' = 'true'
    );
  5. Create a stream job.

    1. Create a job.

      1. In the navigation pane on the left, choose Data Development > ETL.

      2. Create a stream job. In the Create Job Draft dialog box, configure the job parameters.

        Job Parameter

        Description

        File Name

        The name of the job.

        Note

        The job name must be unique within the current project.

        Engine Version

        The Flink engine version used by the current job. For more information about engine version numbers, version mappings, and important lifecycle dates, see Engine versions.

      3. Click Create.

    2. Write the code.

      In the new job draft, enter the following code to use the datagen connector to continuously generate and write data to the Paimon table.

      CREATE TEMPORARY TABLE datagen
      (
          id        string,
          data      string,
          category  int
      )
      WITH (
          'connector' = 'datagen',
          'rows-per-second' = '100',
          'fields.category.kind' = 'random',
          'fields.category.min' = '1',
          'fields.category.max' = '10'
      );
      INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl`
      SELECT
          id,
          data,
          category,
          cast(LOCALTIMESTAMP as string) as ts,
          cast(CURRENT_DATE as string) as dt,
          cast(hour(LOCALTIMESTAMP) as string) as hh
      FROM datagen;
    3. Click Deploy to publish the data to the production environment.

    4. Start the job on the Job O&M page. For more information, see Start a job.

Step 2: Create an SQL session using EMR Serverless Spark

Create an SQL session for SQL development and queries. For more information about sessions, see Session Manager.

  1. Go to the Sessions page.

    1. Log on to the EMR console.

    2. In the left-side navigation pane, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of the workspace that you want to manage.

    4. In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Sessions.

  2. Create an SQL session.

    1. On the SQL Sessions tab, click Create SQL Session.

    2. On the Create SQL Session page, configure the following parameters, leave the other parameters with their default settings, and then click Create.

      Parameter

      Description

      Name

      The custom name of the SQL session. For example, paimon_compute.

      Spark Configurations

      Enter the following Spark configurations to connect to Paimon.

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      Replace the following information as needed:

      • <warehouse>: The actual location of the data warehouse. Modify this value as needed.

      • <dlf.catalog.id>: The ID of the DLF data catalog. Modify this value as needed.

    3. In the Actions column, click Start.

Step 3: Run an interactive query or schedule a task using EMR Serverless Spark

EMR Serverless Spark provides two operation modes to meet different needs: interactive query and task scheduling. The interactive query mode is suitable for quick queries and debugging. The task scheduling mode supports the development, publishing, and O&M of tasks for complete lifecycle management.

During the data writing process, you can use EMR Serverless Spark to run interactive queries on Paimon tables at any time. This lets you retrieve the real-time data status and perform quick analysis. In addition, you can publish developed jobs and create a workflow to orchestrate and publish the tasks. You can configure scheduling policies to run tasks periodically, which ensures automated and efficient data processing and analysis.

Interactive query

  1. Create an SQL development job.

    1. On the EMR Serverless Spark page, click Data Development in the navigation pane on the left.

    2. On the Development Folders tab, click Create.

    3. In the dialog box that appears, enter a Name, such as paimon_compact. Set Type to SparkSQL and click OK.

    4. In the upper-right corner, select the data catalog, database, and the SQL session that you started in the previous step.

    5. In the new job editor, enter an SQL statement.

      • Example 1: Query the first 10 rows of data in the test_append_tbl table.

        SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;

        The following figure shows a sample result.

        image

      • Example 2: Count the number of rows that meet specific conditions in the test_append_tbl table.

        SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';

        The following figure shows a sample result.

        image

  2. Run and publish the job.

    1. Click Run.

      You can view the results on the Run Result tab below. If an error occurs, you can view the details on the Run Issues tab.

    2. After you confirm that the job runs correctly, click Publish in the upper-right corner.

    3. In the Publish dialog box, you can enter a description for the release and then click OK.

Task scheduling

  1. Query file information before the Compact operation.

    On the Data Development page, create an SQL development job to query the Paimon files system table. This lets you quickly retrieve data about the files before the Compact operation. For more information about how to create an SQL development job, see SparkSQL development.

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image

  2. On the Data Development page, write the Paimon Compact SQL, such as paimon_compact, and then publish it.

    For more information about how to create an SQL development job, see SparkSQL development.

    CALL paimon.sys.compact (
      table => 'test_paimon_db.test_append_tbl',
      partitions => 'dt=\"2024-06-24\",hh=\"19\"',
      order_strategy => 'zorder',
      order_by => 'category'
    );
  3. Create a workflow.

    1. On the EMR Serverless Spark page, click Task Orchestration in the navigation pane on the left.

    2. On the Task Orchestration page, click Create Workflow.

    3. In the Create Workflow panel, enter a Workflow Name, such as paimon_workflow_task, and click Next.

      Configure the parameters in the Other Settings section as needed. For more information about the parameters, see Manage workflows.

    4. On the new node canvas, click Add Node.

    5. From the Source File Path drop-down list, select the published SQL development job (paimon_compact). Enter the Spark Configurations and click Save.

      Parameter

      Description

      Name

      The custom name of the SQL session. For example, paimon_compute.

      Spark Configurations

      Enter the following Spark configurations to connect to Paimon.

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      Replace the following information as needed:

      • <warehouse>: The actual location of the data warehouse. Modify this value as needed.

      • <dlf.catalog.id>: The ID of the DLF data catalog. Modify this value as needed.

    6. On the new node canvas, click Publish Workflow and then click OK.

  4. Run the workflow.

    1. On the Task Orchestration page, click the Workflow Name of the new workflow, such as paimon_workflow_task.

    2. On the Workflow Instance List page, click Run Manually.

    3. In the Trigger Run dialog box, click OK.

  5. Verify the result of the Compact operation.

    After the workflow is successfully scheduled and executed, run the same SQL query again. Compare the number of files, number of records, and file sizes before and after the Compact operation to verify the result of the operation.

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image