All Products
Search
Document Center

Realtime Compute for Apache Flink:Build a streaming data lakehouse by using Realtime Compute for Apache Flink and Apache Paimon

Last Updated:Apr 09, 2024

This topic describes how to build a streaming data lakehouse by using Realtime Compute for Apache Flink and Apache Paimon.

Background information

As digitalization advances, enterprises have an increasingly strong demand for timeliness of data. For traditional offline data warehousing, offline deployments are periodically scheduled to add the changes that are generated within the previous period of time to the following layers of the data warehouse: operational data store (ODS), data warehouse detail (DWD), data warehouse service (DWS), and application data service (ADS). However, this method causes a long latency and high costs. In most cases, the scheduling interval of offline deployments is 1 hour or even 1 day. Data consumers can view only the data of the previous scheduling cycle. In addition, data is updated in the way of overwriting data in partitions. The original data in the partitions must be re-read before the data can be merged with the new changes to generate new result data.

To resolve the preceding issues of traditional offline data warehouses, you can use Realtime Compute for Apache Flink and Apache Paimon to build a streaming data lakehouse. The real-time computing capabilities of Realtime Compute for Apache Flink allow data to flow between data warehouse layers in real time. Data changes can also be delivered to downstream consumers within a minute-level latency based on the efficient update capability of Apache Paimon. Therefore, the streaming data lakehouse has advantages in terms of latency and cost.

For more information about the features of Apache Paimon, see Features and visit the Apache Paimon official website.

Architecture and benefits

Architecture

Realtime Compute for Apache Flink is a powerful streaming compute engine that supports efficient processing of large amounts of real-time data. Apache Paimon is a data lake storage platform that allows you to process data in streaming and batch modes. Apache Paimon supports data updates with high throughput and data queries with low latency. Apache Paimon is deeply integrated with Realtime Compute for Apache Flink to provide an integrated streaming data lakehouse solution. The following figure shows the architecture of the streaming data lakehouse that is built by using Realtime Compute for Apache Flink and Apache Paimon.

  1. Realtime Compute for Apache Flink writes data from a data source to Apache Paimon to form the ODS layer.

  2. Realtime Compute for Apache Flink subscribes to changelogs at the ODS layer for processing and then rewrites the data to Apache Paimon to form the DWD layer.

  3. Realtime Compute for Apache Flink subscribes to changelogs at the DWD layer for processing and then rewrites the data to Apache Paimon to form the DWS layer.

  4. MaxCompute or E-MapReduce reads data from an Apache Paimon external table and provides external data queries.

image.png

Benefits

This solution provides the following benefits:

  • Each layer of Apache Paimon can deliver changelogs to the downstream storage within a minute-level latency. This reduces the latency of traditional offline data warehouses from hours or even days to minutes.

  • Each layer of Apache Paimon can directly receive changelogs. Data in partitions is not overwritten. This greatly reduces the cost of updating and revising data in traditional offline data warehouses and resolves the issues that data at the intermediate layer is not easy to query, update, or correct.

  • This solution provides a unified model and uses a simplified architecture. The logic of extract, transform, and load (ETL) operations is implemented based on Flink SQL. All data at the ODS layer, DWD layer, and DWS layer is stored in Apache Paimon. This reduces the architecture complexity and improves data processing efficiency.

This solution relies on three core capabilities of Apache Paimon. The following table describes the core capabilities.

Core capability of Apache Paimon

Description

Update of data in a primary key table

Apache Paimon uses the log-structured merge-tree (LSM-tree) at the underlying layer to implement efficient data updates.

For more information about Apache Paimon primary key tables, see Primary Key Table. For more information about the data structure of the underlying layer of Apache Paimon, see File Layouts.

Incremental data generation mechanism

The incremental data generation mechanism is specified by the changelog-producer parameter. Apache Paimon can generate complete incremental data for any input data stream. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. This ensures that data changes can be completely passed to the downstream. For more information, see Incremental data generation mechanism.

Data merging mechanism

The data merging mechanism is specified by the merge-engine parameter. When an Apache Paimon primary key table receives multiple data records that have the same primary key, the Apache Paimon result table merges the data records into one data record to ensure the uniqueness of the primary key. Apache Paimon supports various data merging mechanisms, such as deduplication, partial-update, and aggregation. For more information, see Data merging mechanism.

Best practices

This example shows how to build a streaming data lakehouse for an e-commerce platform to process and cleanse data and query data from upper-layer applications. This way, the data is layered and reused to support multiple business scenarios, such as report queries for transaction dashboard data analysis, behavioral data analysis, and user profile tagging, and personalized recommendations.

image.png

  1. Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse A MySQL database contains three business tables: orders, orders_pay, and product_catalog. Realtime Compute for Apache Flink writes the data of these tables to Object Storage Service (OSS) in real time and stores the data in the Apache Paimon format to form the ODS layer.

  2. Build the DWD layer: Wide table Realtime Compute for Apache Flink uses the partial-update data merging mechanism of Apache Paimon to combine data in the orders, orders_pay, and product_catalog tables into a wide table at the DWD layer and generate changelogs with a minute-level latency.

  3. Build the DWS layer: Metric computing Realtime Compute for Apache Flink consumes the changelogs of the wide table in real time, and uses the aggregation data merging mechanism of Apache Paimon to generate the user-shop aggregate intermediate table dwm_users_shops at the DWM layer and finally generate the user aggregate metric table dws_users and the shop aggregate metric table dws_shops at the DWS layer.

Prerequisites

Note

The MaxCompute project and OSS bucket must be in the same region as the workspace of fully managed Flink.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.1 or later supports the streaming data lakehouse solution.

Build a streaming data lakehouse

Prepare a MySQL CDC data source

In this example, three business tables are created in a database named order_dw in an ApsaraDB RDS for MySQL instance, and data is inserted into the tables.

  1. Create an ApsaraDB RDS for MySQL instance.

    Important

    The ApsaraDB RDS for MySQL instance and the workspace of fully managed Flink must reside in the same virtual private cloud (VPC). If the ApsaraDB RDS for MySQL instance and the workspace of fully managed Flink do not reside in the same VPC, handle the issue by following the instructions provided in How does Realtime Compute for Apache Flink access a service across VPCs?

  2. Create a database and an account.

    Create a database named order_dw and create a privileged account or a standard account that has the read and write permissions on the order_dw database.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Manage catalogs

  1. Create an Apache Paimon catalog.

    1. Go to the Catalogs page.

      1. Log on to the Realtime Compute for Apache Flink console.

      2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

      3. In the left-side navigation pane, click Catalogs.

    2. Create a custom catalog type.

      1. On the Catalog List page, click Create Catalog. In the Create Catalog dialog box, click the Custom Catalog tab and click Add Catalog Type.

      2. In the Create catalog type dialog box, click Click to Select next to Upload File and select the Apache Paimon catalog plug-in package paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar.

        If the message File(s) size too large appears, click Continue.

      3. Click Next. After a period of time, click Confirm.

    3. On the Custom Catalog tab, click paimon-06-1 and click Next.

    4. In the Create Catalog dialog box, enter the following SQL statement, configure the parameters in the statement, and then click Confirm to create an Apache Paimon catalog named dw.

      CREATE CATALOG `dw` WITH (
          'type' = 'paimon-06-1',
          'metastore' = 'maxcompute',
          'warehouse' = '<warehouse>',
          'fs.oss.endpoint' = '<oss endpoint>',
          'fs.oss.accessKeyId' = '<oss access key id>',
          'fs.oss.accessKeySecret' = '${secret_values.aksecret}',
          'maxcompute.endpoint' = '<maxcompute endpoint>',
          'maxcompute.accessid' = '<maxcompute access id>',
          'maxcompute.accesskey' = '${secret_values.aksecret}',
          'maxcompute.project' = '<maxcompute project>',
          'maxcompute.oss.endpoint' = '<maxcompute oss endpoint>'
      );

      Parameters in the WITH clause

      Parameter

      Description

      Required

      Remarks

      type

      The type of the catalog.

      Yes

      Set the value to paimon-06-1.

      metastore

      The metadata storage type.

      Yes

      To ensure that data in Apache Paimon tables can be analyzed in MaxCompute, enter maxcompute.

      warehouse

      The data warehouse directory that is specified in OSS.

      Yes

      The format is oss://<bucket>/<object>. Parameters in the directory:

      • bucket: the name of the OSS bucket that you created.

      • object: the path in which your data is stored.

      You can view the bucket name and object name in the OSS console.

      Important
      • The OSS bucket must be in the same region as the MaxCompute project.

      • The storage class of the OSS bucket must be Standard. For more information, see Overview.

      fs.oss.endpoint

      The endpoint that is used to access OSS from fully managed Flink.

      Yes

      For more information, see Regions and endpoints.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account that has read and write permissions on OSS.

      Yes

      For more information about how to obtain the AccessKey ID of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user.

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account that has read and write permissions on OSS.

      Yes

      For more information about how to obtain the AccessKey secret of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user.

      In this example, a key named aksecret is used as the AccessKey secret to protect your AccessKey pair. For more information, see Manage keys.

      maxcompute.endpoint

      The endpoint of the MaxCompute service.

      Yes

      For more information, see Endpoints.

      maxcompute.accessid

      The AccessKey ID of the Alibaba Cloud account that has permissions on MaxCompute.

      Yes

      For more information about how to obtain the AccessKey ID of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user.

      maxcompute.accesskey

      The AccessKey secret of the Alibaba Cloud account that has permissions on MaxCompute.

      Yes

      For more information about how to obtain the AccessKey secret of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user.

      In this example, a key named aksecret is used as the AccessKey secret to protect your AccessKey pair. For more information, see Manage keys.

      maxcompute.project

      The name of the MaxCompute project.

      Yes

      A MaxCompute project for which the schema feature is enabled is not supported.

      maxcompute.oss.endpoint

      The endpoint that is used to access OSS from MaxCompute.

      No

      If you do not specify this parameter, the value of the fs.oss.endpoint parameter is used by default. For more information, see Regions and endpoints.

      Important

      The OSS bucket and the MaxCompute project reside in the same region. Therefore, we recommend that you enter the internal endpoint of the OSS bucket.

  2. Create a MySQL catalog.

    1. On the Catalog List page, click Create Catalog.

    2. On the Built-in Catalog tab of the Create Catalog dialog box, click MySQL and click Next.

    3. Configure parameters and click Confirm to create a MySQL catalog named mysqlcatalog. The following table describes the parameters.

      Parameter

      Description

      Required

      Remarks

      catalog name

      The name of the catalog.

      Yes

      Enter a custom name.

      In this example, the catalog is named mysqlcatalog.

      hostname

      The IP address or hostname that is used to access the MySQL database.

      Yes

      For more information, see View and manage instance endpoints and ports. The ApsaraDB RDS for MySQL instance and the workspace of fully managed Flink reside in the same VPC. Therefore, we recommend that you enter the value of Internal Endpoint that is displayed on the Database Connection page for the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console.

      port

      The port number of the MySQL database. Default value: 3306.

      No

      For more information, see View and manage instance endpoints and ports.

      default-database

      The name of the default MySQL database.

      Yes

      In this example, the name of the database from which data is synchronized is order_dw.

      username

      The username that is used to access the MySQL database.

      Yes

      In this example, the account that is created in Prepare a MySQL CDC data source is used.

      password

      The password that is used to access the MySQL database.

      Yes

      In this example, the password that is created in Prepare a MySQL CDC data source is used.

Build the ODS layer: Real-time ingestion of data in a business database into the data warehouse

You can execute the CREATE DATABASE AS statement to build the ODS layer. The parameters that are configured in the SET statements in an SQL deployment can also be configured on the Configuration tab of the Deployments page. For more information, see How do I configure parameters for deployment running? For more information about the write performance optimization of Apache Paimon, see the Apache Paimon official documentation.

  1. Create a synchronization deployment that uses the CREATE DATABASE AS statement.

    1. In the Realtime Compute for Apache Flink console, create an SQL streaming draft named ods and copy the following code to the SQL editor:

      SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- Reduce the impact of the long tail of checkpoints. 
      SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- Eliminate useless SinkMaterialize operators. 
      
      -- The Apache Paimon result table commits data only after each checkpointing is complete. 
      -- In this example, the checkpointing interval is reduced to 10s to help you quickly obtain the results. 
      -- In the production environment, the checkpointing interval and the minimal pause between checkpointing attempts vary based on your business requirements for latency. In most cases, they are set to 1 to 10 minutes. 
      SET 'execution.checkpointing.interval' = '10s';
      SET 'execution.checkpointing.min-pause' = '10s';
      
      CREATE DATABASE IF NOT EXISTS dw.order_dw
      WITH (
        'changelog-producer' = 'input' -- The input data is the binary log data that is generated by MySQL and contains all changelogs. Therefore, the input data can be directly used as changelogs. 
      ) AS DATABASE mysqlcatalog.order_dw INCLUDING all tables; -- You can also select the tables whose data needs to be imported into data warehouses from the upstream database based on your business requirements.

    2. In the upper-right corner of the SQL Editor page, click Deploy to deploy the draft.

    3. On the Deployments page, find the deployment named ods and click Start in the Action column. In the Start Job panel, select Initial Mode and click Start to start the deployment.

  2. View the data of the three tables that are synchronized from MySQL to Apache Paimon.

    In the Realtime Compute for Apache Flink console, create an SQL batch draft named test, copy the following code to the SQL editor, and then click Debug. For more information, see Debug a deployment.

    SELECT * FROM dw.order_dw.orders ORDER BY order_id;

    Image 23.png

Build the DWD layer: Wide table

  1. Create a wide table named dwd_orders at the DWD layer in Apache Paimon. In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the draft named test, select the code that you want to run, and then click Run that appears on the left side of the code.

    CREATE TABLE dw.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'platform 0: phone, 1: pc',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update', -- Use the partial-update data merging mechanism to generate a wide table.
        'changelog-producer' = 'lookup' -- Use the lookup incremental data generation mechanism to generate changelogs with low latency.
    );

    If the Query has been executed message is returned, the table is created.

  2. Consume the changelogs of the orders and orders_pay tables at the ODS layer in real time.

    In the Realtime Compute for Apache Flink console, create an SQL streaming draft named dwd, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft without initial states.

    The deployment joins the orders table with the dimension table product_catalog and writes the join result and data in the orders_pay table to the wide table dwd_orders. In this process, the partial-update data merging mechanism of Apache Paimon is used to merge the data that has the same order_id value in the orders and orders_pay tables.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Apache Paimon does not allow you to use multiple INSERT statements to write data to the same table in the same deployment. Therefore, UNION ALL is used in this example. 
    INSERT INTO dw.order_dw.dwd_orders 
    SELECT 
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,
        NULL,
        NULL
    FROM
        dw.order_dw.orders o 
        LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        pay_id,
        pay_platform,
        create_time
    FROM
        dw.order_dw.orders_pay;
  3. View data of the wide table dwd_orders. In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the draft named test, select the code that you want to run, and then click Debug in the upper-right corner to view the data of the wide table dwd_orders.

    SELECT * FROM dw.order_dw.dwd_orders ORDER BY order_id;

    image.png

Build the DWS layer: Metric computing

  1. Create aggregate metric tables named dws_users and dws_shops at the DWS layer. In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the draft named test, select the code, and then click Run that appears on the left side of the code.

    -- Create a user-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount of payment that is complete on the current day',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation data merging mechanism to generate an aggregate table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- Sum the payed_buy_fee_sum data to generate the aggregate result.
        -- The dws_users table is not consumed by the downstream storage in streaming mode. Therefore, you do not need to specify the incremental data generation mechanism.
    );
    
    -- Create a shop-dimension aggregate metric table. 
    CREATE TABLE dw.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount of payment that is complete on the current day',
        uv BIGINT COMMENT 'Total number of users that purchase commodities on the current day',
        pv BIGINT COMMENT 'Total number of purchases made by all users on the current day',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation data merging mechanism to generate an aggregate table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- Sum the payed_buy_fee_sum data to generate the aggregate result.
        'fields.uv.aggregate-function' = 'sum', -- Sum the UV data to generate the aggregation result.
        'fields.pv.aggregate-function' = 'sum' -- Sum the PV data to generate aggregation result.
        -- The dws_shops table is not consumed by the downstream storage in streaming mode. Therefore, you do not need to specify the incremental data generation mechanism.
    );
    
    -- To calculate the aggregate table from the user perspective and the aggregate table from the shop perspective at the same time, create an intermediate table that uses the user_id and shop_id fields as the primary key. 
    CREATE TABLE dw.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount paid by the user in the shop on the current day',
        'fields.pv.aggregate-function' = 'sum' -- Number of purchases made by the user in the shop on the current day',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation data merging mechanism to generate an aggregate table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- Sum the payed_buy_fee_sum data to generate the aggregate result.
        'fields.pv.aggregate-function' = 'sum' -- Sum the PV data to generate the aggregation result.
        'changelog-producer'='lookup', -- Use the lookup incremental data generation mechanism to generate changelogs with low latency.
        -- In most cases, the intermediate table at the DWM layer does not provide queries for upper-layer applications. Therefore, the write performance can be optimized. 
        'file.format' = 'avro', -- Use the Avro row-oriented storage format to provide more efficient write performance. 
        'metadata.stats-mode' = 'none' -- Discard the statistical information. After the statistical information is discarded, the cost of OLAP queries increases but the write performance increases. This has no impact on continuous stream processing. 
    );

    If the Query has been executed message is returned, the tables are created.

  2. Consume the changelogs of the dwd_orders table at the DWD layer in real time. In the Realtime Compute for Apache Flink console, create an SQL streaming draft named dwm, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft without initial states.

    When the SQL deployment runs, the data in the dwd_orders table is written to the dwm_users_shops table. The aggregation data merging mechanism of the Apache Paimon table is used to sum the values of order_fee to obtain the total consumption amount of the user in the shop. This deployment also calculates the sum of 1 to obtain the number of purchases of the user in the shop.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO dw.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        order_fee,
        1 -- One input record represents one purchase.
    FROM dw.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. Consume the changelogs of the dwm_users_shops table at the DWM layer in real time. In the Realtime Compute for Apache Flink console, create an SQL streaming draft named dws, copy the following code to the SQL editor, deploy the draft, and then start the deployment for the draft without initial states.

    When the SQL deployment runs, the data in the dwm_users_shops table is written to the dws_users table. The aggregation data merging mechanism of the Apache Paimon table is used to sum the values of payed_buy_fee_sum to obtain the total consumption amount of the user in all shops. The SQL deployment also writes the data of the dwm_users_shops table to the dws_shops table, and uses the aggregation data merging mechanism of the Apache Paimon table to sum the values of payed_buy_fee_sum to obtain the total amount of business transactions of the shop. This deployment also calculates the sum of 1 to obtain the number of users who made purchases in the shop. Finally, this deployment calculates the sum of PV data to obtain the total number of purchases made by all users in the shop.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Different from the DWD layer, multiple INSERT statements that write data to different Apache Paimon tables can be placed in the same deployment at the DWM layer. 
    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
    SELECT 
        user_id,
        ds,
        payed_buy_fee_sum
    FROM dw.order_dw.dwm_users_shops;
    
    -- The shop_id column is used as the primary key. The amount of data of specific popular shops may be much higher than the amount of data of other shops. 
    -- Therefore, local merge is used to aggregate data in the memory before data is written to Apache Paimon. This helps alleviate data skew issues. 
    INSERT INTO dw.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        payed_buy_fee_sum,
        1, -- One input record represents all the consumption of a user in the shop.
        pv
    FROM dw.order_dw.dwm_users_shops;
    
    END;
  4. View the data of the dws_users and dws_shops tables. In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the draft named test, select the code that you want to run, and then click Debug in the upper-right corner to view the data in the table.

    -- View data of the dws_users table.
    SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;
    
    -- View data of the dws_shops table.
    SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;

    image.png

Capture changes in the business database

The streaming data lakehouse is created. This section tests the capability of the streaming data lakehouse to capture changes in the business database.

  1. Insert the following data into the order_dw database of MySQL:

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. View the data of the dws_users and dws_shops tables. In the Realtime Compute for Apache Flink console, copy the following code to the SQL editor of the draft named test, select the code that you want to run, and then click Debug in the upper-right corner to view the latest data.

    • dws_users table

      SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;

      Image 05.png

    • dws_shops table

      SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;

      Image 06.png

Use the streaming data lakehouse

The previous section describes how to create an Apache Paimon catalog and write data to an Apache Paimon table in the Realtime Compute for Apache Flink console. MaxCompute is specified as the metadata storage in the catalog. Therefore, an Apache Paimon external table is automatically created in MaxCompute when you create a table in the catalog. This section describes specific simple scenarios in which MaxCompute is used to analyze data after the streaming data lakehouse is created. For more information about how to query Apache Paimon external tables in MaxCompute, see Apache Paimon external tables.

Perform ranking queries

Analyze the aggregate table at the DWS layer. In this example, DataWorks is used to query data of the top three shops with the highest transaction volume on February 15, 2023.

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

image.png

Perform details queries

Analyze the wide table at the DWD layer. The following sample code shows how to use DataWorks to query the order details paid by a customer on a specific payment platform in February 2023.

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;

image.png

Query data reports

Analyze the wide table at the DWD layer. The following sample code shows how to use DataWorks to query the total number of orders and the total amount of orders for each category in February 2023.

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

References