All Products
Search
Document Center

Realtime Compute for Apache Flink:Build a streaming data lakehouse using Paimon and StarRocks

Last Updated:Sep 19, 2025

This topic describes how to build a streaming data lakehouse using Realtime Compute for Apache Flink, Apache Paimon, and StarRocks.

Background information

As society becomes more digital, businesses require faster access to data. Traditional offline data warehouses use scheduled jobs to merge new changes from the previous period into hierarchical data warehouse layers, such as the Operational Data Store (ODS), Data Warehouse Detail (DWD), Data Warehouse Summary (DWS), and Application Data Store (ADS). However, this approach has two major drawbacks: high latency and high cost. Offline jobs typically run hourly or daily, which means data consumers can only view data from the previous hour or day. Additionally, data updates often overwrite entire partitions. This process requires rereading the original data in the partition to merge it with new changes and generate new results.

Building a streaming data lakehouse with Realtime Compute for Apache Flink and Apache Paimon solves these issues. The real-time computing capabilities of Flink allow data to flow between data warehouse layers in real time. The efficient update capabilities of Paimon deliver data changes to downstream consumers with minute-level latency. Therefore, a streaming data lakehouse offers advantages in terms of both latency and cost.

For more information about the features of Apache Paimon, see Features and visit the Apache Paimon official website.

Architecture and benefits

Architecture

Realtime Compute for Apache Flink is a powerful stream computing engine that efficiently processes large amounts of real-time data. Apache Paimon is a unified streaming and batch data lake storage format that supports high-throughput updates and low-latency queries. Paimon is deeply integrated with Flink to provide an all-in-one streaming data lakehouse solution. The architecture of the streaming data lakehouse built with Flink and Paimon is as follows:

  1. Flink writes data from a data source to Paimon to create the ODS layer.

  2. Flink subscribes to changelogs at the ODS layer for processing and then rewrites the data to Paimon to create the DWD layer.

  3. Flink subscribes to changelogs at the DWD layer for processing and then rewrites the data to Paimon to create the DWS layer.

  4. Finally, StarRocks on the open source big data platform EMR reads the Paimon foreign table to support application queries.

image

Benefits

This solution provides the following benefits:

  • Each Paimon layer can deliver changes to the downstream with minute-level latency. This reduces the latency of traditional offline data warehouses from hours or days to minutes.

  • Each Paimon layer can directly accept change data without overwriting partitions. This significantly reduces the cost of data updates and corrections in traditional offline data warehouses. It also solves the issue of data at intermediate layers being difficult to query, update, or correct.

  • The model is unified and the architecture is simplified. The logic of the extract, transform, and load (ETL) pipeline is implemented using Flink SQL. Data at the ODS, DWD, and DWS layers is stored uniformly in Paimon. This reduces architectural complexity and improves data processing efficiency.

This solution relies on three core capabilities of Paimon, as shown in the following table.

Core capabilities of Paimon

Details

Primary key table updates

Paimon uses a Log-Structured Merge-tree (LSM tree) data structure at the underlying layer to achieve efficient data updates.

For more information about Paimon primary key tables and underlying data structures, see Primary Key Table and File Layouts.

Changelog producer

Paimon can produce a complete changelog for any input data stream. All `update_after` data has a corresponding `update_before` data. This ensures that data changes are completely passed to the downstream. For more information, see Changelog production mechanism.

Merge engine

When a Paimon primary key table receives multiple records with the same primary key, the sink table merges them into a single record to maintain primary key uniqueness. Paimon supports various merge behaviors, such as deduplication, partial updates, and pre-aggregation. For more information, see Data merging mechanism.

Scenarios

This topic uses an e-commerce platform as an example to demonstrate how to build a streaming data lakehouse to process and cleanse data and to support data queries from upper-layer applications. The streaming data lakehouse implements data layering and reuse. It supports various business scenarios, such as report queries for transaction dashboards, behavioral data analytics, user persona tagging, and personalized recommendations.

image

  1. Build the ODS layer: Ingest data from a business database into the data warehouse in real time.
    A MySQL database contains three business tables: `orders`, `orders_pay`, and `product_catalog`. Flink writes the data from these tables to OSS in real time and stores it in the Paimon format to create the ODS layer.

  2. Build the DWD layer: Create a topic-based wide table.
    Use the partial-update merge mechanism of Paimon to widen the `orders`, `product_catalog`, and `orders_pay` tables. This generates a DWD layer wide table and produces a changelog with minute-level latency.

  3. Build the DWS layer: Calculate metrics.
    Flink consumes the changelog of the wide table in real time. It uses the aggregation merge mechanism of Paimon to produce the `dwm_users_shops` intermediate table (user-merchant aggregation) at the DWM layer. Finally, it produces the `dws_users` (user aggregation metrics) and `dws_shops` (merchant aggregation metrics) tables at the DWS layer.

Prerequisites

Note

The StarRocks instance and DLF must be in the same region as the Flink workspace.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 11.1.0 or later supports this streaming data lakehouse solution.

Build a streaming data lakehouse

Prepare a MySQL CDC data source

This example uses an ApsaraDB RDS for MySQL instance. Create a database named `order_dw` and three business tables with data.

  1. Create an ApsaraDB RDS for MySQL instance.

    Important

    The ApsaraDB RDS for MySQL instance must be in the same VPC as the Flink workspace. If they are not in the same VPC, see How do I access other services across VPCs?

  2. Create a database and an account.

    Create a database named `order_dw`. Create a privileged account or a standard account with read and write permissions on the `order_dw` database.

    Create three tables and insert data.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Manage metadata

Create a Paimon catalog

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the navigation pane on the left, click Catalogs, and then click Create Catalog.

  3. On the Built-in Catalog tab, click Apache Paimon, then click Next.

  4. Configure the following parameters, select DLF as the storage type, and click OK.

    Parameter

    Description

    Required

    Note:

    metastore

    The metastore type.

    Yes

    In this example, select dlf.

    catalog name

    The DLF data catalog name.

    Important

    When you use a Resource Access Management (RAM) user or role, make sure that you have read and write permissions on DLF. For more information, see Authorization management.

    Yes

    Use DLF 2.5. You do not need to enter an AccessKey pair. You can select an existing DLF data catalog. For more information about how to create a data catalog, see Data Catalog.

    In this example, select a data catalog named paimoncatalog.

  5. Create the `order_dw` database in the data catalog to synchronize all table data from the `order_dw` database in MySQL.

    In the left navigation pane, choose Scripts > Query Scripts and click New to create a temporary query.

    -- Use the paimoncatalog data source.
    USE CATALOG paimoncatalog;
    -- Create the order_dw database.
    CREATE DATABASE order_dw;

    The message The following statement has been executed successfully! indicates that the database is created.

For more information about how to use Paimon catalogs, see Manage Paimon catalogs.

Create a MySQL catalog

  1. On the Catalogs page, click Create Catalog.

  2. On the Built-in Catalog tab, click MySQL, then click Next.

  3. To create a MySQL catalog named mysqlcatalog, configure the following parameters and click OK.

    Parameter

    Description

    Required

    Note:

    catalog name

    The name of the catalog.

    Yes

    Enter a custom name. This example uses mysqlcatalog.

    hostname

    The IP address or hostname of the MySQL database.

    Yes

    For more information, see View and manage instance endpoints and ports. Because the ApsaraDB RDS for MySQL instance and the fully managed Flink workspace are in the same VPC, enter the internal endpoint.

    port

    The port number of the MySQL database service. The default value is 3306.

    No

    For more information, see View and manage instance endpoints and ports.

    default-database

    The name of the default MySQL database.

    Yes

    Enter the name of the database to synchronize, `order_dw`.

    username

    The username for the MySQL database service.

    Yes

    This is the account created in the Prepare a MySQL CDC data source section.

    password

    The password for the MySQL database service.

    Yes

    This is the password created in the Prepare a MySQL CDC data source section.

Build the ODS layer: Ingest data from a business database into the data warehouse in real time

Use Flink CDC to synchronize data from MySQL to Paimon through a YAML data ingestion job. This builds the ODS layer in one step.

  1. Create and start the YAML data ingestion job.

    1. In the Realtime Compute for Apache Flink console, navigate to the Development > Data Ingestion page and create a blank YAML draft named ods.

    2. Copy the following code to the editor. Make sure to modify parameters such as the username and password.

      source:
        type: mysql
        name: MySQL Source
        hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com
        port: 3306
        username: ${secret_values.username}
        password: ${secret_values.password}
        tables: order_dw.\.*  # Use a regular expression to read all tables in the order_dw database.
      
      sink:
        type: paimon
        name: Paimon Sink
        catalog.properties.metastore: rest
        catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com
        catalog.properties.warehouse: paimoncatalog
        catalog.properties.token.provider: dlf
        
      pipeline:
        name: MySQL to Paimon Pipeline

      Parameter

      Description

      Required

      Example

      catalog.properties.metastore

      The Metastore type. Set this to `rest`.

      Yes

      rest

      catalog.properties.token.provider

      The token provider. Set this to `dlf`.

      Yes

      dlf

      catalog.properties.uri

      The URI to access the DLF Rest Catalog Server. The format is http://[region-id]-vpc.dlf.aliyuncs.com. For more information, see the Region ID in Service endpoints.

      Yes

      http://cn-beijing-vpc.dlf.aliyuncs.com

      catalog.properties.warehouse

      The DLF Catalog name.

      Yes

      paimoncatalog

      For more information about how to optimize Paimon write performance, see Paimon performance optimization.

    3. In the upper-right corner, click Deploy.

    4. Go to O&M > Deployments. Find the `ods` job that you just deployed. In the Actions column, click Start and select Start Without Initial State. For more information about job startup configurations, see Start a job.

  2. View the data of the three tables that are synchronized from MySQL to Paimon.

    In the Realtime Compute for Apache Flink console, navigate to the Development > Scripts page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.

    SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;

    截屏2024-09-02 14

Build the DWD layer: Create a topic-based wide table

  1. Create the DWD layer Paimon wide table dwd_orders

    In the Realtime Compute for Apache Flink console, navigate to the Development > Scripts page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.

    CREATE TABLE paimoncatalog.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'platform 0: phone, 1: pc',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update', -- Use the partial-update merge engine to generate a wide table.
        'changelog-producer' = 'lookup' -- Use the lookup changelog producer to generate a changelog with low latency.
    );

    The message Query has been executed indicates that the table is created.

  2. Consume the changelog of the ODS layer orders and orders_pay tables in real time

    In the Realtime Compute for Apache Flink console, go to the Development > ETL page. Create a new SQL streaming job named `dwd`, and copy the following code into the SQL editor. Then, Deploy the job and Start it without an initial state. ​

    This SQL job joins the `orders` table with the `product_catalog` table. The joined result and the `orders_pay` table are written to the `dwd_orders` table. The partial-update merge engine of Paimon widens the data from the `orders` and `orders_pay` tables that have the same `order_id`.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Paimon does not currently support multiple INSERT statements into the same table in a single job. Therefore, use UNION ALL.
    INSERT INTO paimoncatalog.order_dw.dwd_orders 
    SELECT 
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,
        NULL,
        NULL
    FROM
        paimoncatalog.order_dw.orders o 
        LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        pay_id,
        pay_platform,
        create_time
    FROM
        paimoncatalog.order_dw.orders_pay;
  3. View the data of the dwd_orders wide table

    In the Realtime Compute for Apache Flink console, go to the Development > Scripts page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.

    SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;

    截屏2024-09-02 14

Build the DWS layer: Calculate metrics

  1. Create the DWS layer aggregation tables dws_users and dws_shops

    In the Realtime Compute for Apache Flink console, go to the Development > Scripts page. On the Query Scripts tab, copy the following code into the query script, select the code snippet, and click Run in the upper-right corner.

    -- User dimension aggregation metric table.
    CREATE TABLE paimoncatalog.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount of payments completed on the day',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- Sum the payed_buy_fee_sum data to produce the aggregation result.
        -- Because the dws_users table is no longer consumed downstream in a streaming fashion, you do not need to specify a changelog producer.
    );
    
    -- Merchant dimension aggregation metric table.
    CREATE TABLE paimoncatalog.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount of payments completed on the day',
        uv BIGINT COMMENT 'Total number of distinct purchasing users on the day',
        pv BIGINT COMMENT 'Total number of purchases by users on the day',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- Sum the payed_buy_fee_sum data to produce the aggregation result.
        'fields.uv.aggregate-function' = 'sum', -- Sum the uv data to produce the aggregation result.
        'fields.pv.aggregate-function' = 'sum' -- Sum the pv data to produce the aggregation result.
        -- Because the dws_shops table is no longer consumed downstream in a streaming fashion, you do not need to specify a changelog producer.
    );
    
    -- To calculate both the user-perspective and merchant-perspective aggregation tables, create an intermediate table with user + merchant as the primary key.
    CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'Total amount paid by the user at the merchant on the day',
        pv BIGINT COMMENT 'Number of purchases made by the user at the merchant on the day',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table.
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- Sum the payed_buy_fee_sum data to produce the aggregation result.
        'fields.pv.aggregate-function' = 'sum', -- Sum the pv data to produce the aggregation result.
        'changelog-producer' = 'lookup', -- Use the lookup changelog producer to generate a changelog with low latency.
        -- The intermediate table at the DWM layer is generally not queried directly by upper-layer applications, so you can optimize for write performance.
        'file.format' = 'avro', -- The avro row store format provides more efficient write performance.
        'metadata.stats-mode' = 'none' -- Discarding statistics information increases the cost of OLAP queries (with no effect on continuous stream processing), but makes write performance more efficient.
    );

    The message Query has been executed indicates that the table is created.

  2. Consume the changelog of the DWD layer dwd_orders table

    In the Realtime Compute for Apache Flink console, navigate to the Development > ETL tab. Create a SQL streaming job named `dwm`. Copy the following code into the SQL editor. Then, Deploy and Start the job without an initial state.

    This SQL job writes data from the `dwd_orders` table to the `dwm_users_shops` table. It uses the pre-aggregation merge engine of Paimon to automatically sum the `order_fee` to calculate the user's total spending at the merchant. It also sums `1` to calculate the number of times the user has purchased from the merchant.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO paimoncatalog.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        order_fee,
        1 -- One input record represents one purchase.
    FROM paimoncatalog.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. Consume the changelog of the DWM layer dwm_users_shops table in real time

    In the Realtime Compute for Apache Flink console, go to the Development > ETL page. Create a new SQL streaming job named `dws`. Copy the following code to the SQL editor. Then, Deploy and Start the job without an initial state.

    This SQL job writes data from the `dwm_users_shops` table to the `dws_users` and `dws_shops` tables. It uses the pre-aggregation merge engine of Paimon to calculate each user's total spending (`payed_buy_fee_sum`) in the `dws_users` table. In the `dws_shops` table, it calculates the merchant's total revenue (`payed_buy_fee_sum`), the number of purchasing users by summing `1`, and the total number of purchases (`pv`).

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Unlike DWD, each INSERT statement here writes to a different Paimon table, so they can be in the same job.
    BEGIN STATEMENT SET;
    
    INSERT INTO paimoncatalog.order_dw.dws_users
    SELECT 
        user_id,
        ds,
        payed_buy_fee_sum
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    -- With merchant as the primary key, some popular merchants may have much more data than others.
    -- Therefore, use local merge to pre-aggregate in memory before writing to Paimon to alleviate data skew.
    INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        payed_buy_fee_sum,
        1, -- One input record represents all of a user's purchases at this merchant.
        pv
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    END;
  4. View the data in the dws_users and dws_shops tables

    In the Realtime Compute for Apache Flink console, navigate to Development > Scripts. On the Query Scripts tab, copy the following code into the editor. Select the code snippet and click Run in the upper-right corner.

    --View dws_users table data
    SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

    image

    --View dws_shops table data
    SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

    截屏2024-09-02 14

Capture changes in the business database

Now that you have built the streaming data lakehouse, the following steps test its ability to capture changes from the business database.

  1. Insert the following data into the `order_dw` database in MySQL.

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. View the data in the dws_users and dws_shops tables. In the Realtime Compute for Apache Flink console, navigate to the Development > Scripts page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.

    • dws_users table

      SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

      截屏2024-09-02 15

    • dws_shops table

      SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

      截屏2024-09-02 15

Use the streaming data lakehouse

The previous section showed how to create a Paimon catalog and write to Paimon tables in Flink. This section describes simple data analysis scenarios using StarRocks after the streaming data lakehouse is built.

First, log on to the StarRocks instance and create an `oss-paimon` catalog. For more information, see Paimon Catalog.

CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
    'type' = 'paimon',
    'paimon.catalog.type' = 'filesystem',
    'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
    'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);

Property

Required

Remarks

type

Yes

The data source type. Set this to `paimon`.

paimon.catalog.type

Yes

The metastore type used by Paimon. This example uses `filesystem` as the metastore type.

aliyun.oss.endpoint

Yes

If you use OSS or OSS-HDFS as the warehouse, you must specify the corresponding endpoint.

paimon.catalog.warehouse

Yes

The format is oss://<bucket>/<object>, where:

  • bucket: The name of your OSS Bucket.

  • object: The path where your data is stored.

You can view your bucket and object names on the OSS console.

Ranking query

To analyze the DWS layer aggregation table, the following example code shows how to use StarRocks to query the top three merchants with the highest transaction amounts on February 15, 2023.

SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum 
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

image

Details query

To analyze the wide table at the DWD layer, the following sample code shows how to use StarRocks to query the order details paid by a customer on a specific payment platform in February 2023:

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;

image

Data reports

To analyze the wide table at the DWD layer, the following sample code shows how to use StarRocks to query the total number of orders and the total order amount for each category in February 2023:

SELECT
  order_create_time AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image

References