All Products
Search
Document Center

E-MapReduce:Data warehouse solution: Real-time computing of incremental data

Last Updated:Jul 04, 2024

This topic describes how to build a data warehouse based on StarRocks to implement real-time computing of incremental data.

Prerequisites

  • A Dataflow cluster or a custom cluster that contains the Flink and Kafka services is created. For more information, see Create a cluster.

  • A StarRocks cluster is created. For more information, see Create a StarRocks cluster.

  • An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.

    Note

    In this example, the Dataflow cluster is of EMR V3.40.0, the StarRocks cluster is of EMR V5.6.0, and the MySQL version of the ApsaraDB RDS for MySQL instance is 5.7.

Limits

  • The Dataflow cluster, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC) and in the same zone.

  • The Dataflow cluster and StarRocks cluster must be accessible over the Internet.

  • The MySQL version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.

Overview

In latency-sensitive business scenarios, data must be immediately processed after it is generated. Real-time computing of incremental data can help you reduce the latency. You can use Flink to aggregate data from layers such as the data warehouse detail (DWD) and data warehouse summary (DWS) layers in advance and persist the aggregation results for further use.

Architecture

The following figure shows the architecture of this solution.

Flink-StarRocks

The solution consists of the following steps:

  1. Use Flink to build a real-time data warehouse, cleanse, process, convert, and aggregate data, and then write the aggregation results to Kafka topics.

  2. Use a StarRocks cluster to subscribe to layers of data from the Kafka topics and persist the data for subsequent queries and analysis.

Features

The solution provides the following features:

  • Incremental data is cleansed, processed, converted, and aggregated by Flink and persisted to a StarRocks cluster through subscription.

  • The aggregation results can be double-written or single-written. If you select the double-writing mode, the results in each layer are written to the Kafka topic in the next layer and sink to the StarRocks cluster in the same layer. If you select the single-writing mode, the results are written only to the Kafka topics, and the StarRocks cluster persists the results in real time by subscribing to layers of data from the Kafka topics. The single-writing mode allows you to query the status of data and update data if required.

  • The StarRocks cluster provides tables for upper-layer applications to perform real-time queries.

  • Advantages

    • Data computing is performed in real time. This meets the requirements of latency-sensitive business scenarios.

    • The metrics are easy to modify. Different from traditional incremental data computing solutions, this solution persists the intermediate status to a StarRocks cluster. This helps improve the flexibility of subsequent analysis operations. If the quality of intermediate data cannot meet your business requirements, you can modify tables to update data.

  • Disadvantages

    • To implement real-time computing of incremental data, you must master the necessary skills in Flink.

    • The solution is not suitable for scenarios in which data is frequently updated and cannot be accumulated in calculations.

    • The solution is not suitable for scenarios in which resource-consuming and complex operations such as multi-stream JOIN are performed.

  • Scenarios

    The solution is suitable for scenarios in which you want to perform simple computing operations on a small amount of data in real time. The most common scenario is the processing of event tracking data.

Procedure

  1. Step 1: Create source MySQL tables

  2. Step 2: Create Kafka topics

  3. Step 3: Create tables in the StarRocks cluster and data import jobs

  4. Step 4: Run a Flink job to start a data stream

  5. Step 5: View the database and table information

  6. Step 6: Insert data and query the inserted data

Step 1: Create source MySQL tables

  1. Create a test database and a test account. For more information, see Create accounts and databases.

    After you create a test database and a test account, grant the read and write permissions to the account.

    Note

    In this example, a database named flink_cdc and an account named emr_test are created.

  2. Use the test account to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  3. Execute the following statements to create tables named orders and customers.

    • Create a table named orders.

      CREATE TABLE flink_cdc.orders (
         order_id INT NOT NULL AUTO_INCREMENT,
         order_revenue FLOAT NOT NULL,
         order_region VARCHAR(40) NOT NULL,
         customer_id INT NOT NULL,
         PRIMARY KEY ( order_id )
      );
    • Create a table named customers.

      CREATE TABLE flink_cdc.customers (
         customer_id INT NOT NULL,
         customer_age INT NOT NULL,
         customer_name VARCHAR(40) NOT NULL,
         PRIMARY KEY ( customer_id )
      );

Step 2: Create Kafka topics

  1. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to go to the /bin directory of Kafka:

    cd /opt/apps/FLINK/flink-current/bin
  3. Run the following commands to create Kafka topics:

    kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 
    kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 
    kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 
    kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 

Step 3: Create tables in the StarRocks cluster and data import jobs

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  3. Execute the following statement to create a database:

    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  4. Execute the following statements to create tables named customers and orders.

    • Create a table named customers.

      CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
        `customer_id` INT NOT NULL  COMMENT "",
        `customer_age` FLOAT NOT NULL  COMMENT "",
        `customer_name` STRING NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`customer_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
    • Create a table named orders.

      CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
        `order_id` INT NOT NULL  COMMENT "",
        `order_revenue` FLOAT NOT NULL  COMMENT "",
        `order_region` STRING NOT NULL  COMMENT "",
        `customer_id` INT NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`order_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
  5. Execute the following statement to create a table at the DWD layer:

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`(
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  6. Execute the following statement to create a table at the DWS layer:

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` (
      `order_region` STRING NOT NULL  COMMENT "",
      `order_cnt` INT NOT NULL  COMMENT "",
      `order_total_revenue` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_region`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_region`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  7. Execute the following statements to create data import jobs in Routine Load mode and subscribe to data in Kafka topics:

    CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders
    COLUMNS (order_id, order_revenue, order_region, customer_id)
    PROPERTIES
    (
      "format" = "json",
      "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "ods_order"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers
    COLUMNS (customer_id, customer_age, customer_name)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "ods_customers"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid
    COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "dwd_order_customer_valid"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region
    COLUMNS (order_region, order_cnt, order_total_revenue)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "dws_agg_by_region"
    );

Step 4: Run a Flink job to start a data stream

  1. Download the packages of the Flink change data capture (CDC) and Flink StarRocks connectors, and upload the packages to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.

  2. Copy the JAR packages in the /opt/apps/FLINK/flink-current/opt/connectors/kafka directory of the Dataflow cluster to the /opt/apps/FLINK/flink-current/lib directory.

  3. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

  4. Run the following command to start the cluster.

    Important

    The example in this topic is only for testing purposes. To run Flink jobs in a production environment, use YARN or Kubernetes to submit the jobs. For more information, see Apache Hadoop YARN and Native Kubernetes.

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. Write code for a Flink SQL job and save the code as the demo.sql file.

    Run the following command to open the demo.sql file. Edit the file based on your business requirements.

    vim demo.sql

    The following example shows the code in the file:

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    -- Create order source tables. 
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`(
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'orders'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    -- create ods dwd and dws tables
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` (
      `order_id` INT,
      `order_revenue` FLOAT,
      `order_region` VARCHAR(40),
      `customer_id` INT,
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'ods_order',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` (
      `customer_id` INT,
      `customer_age` FLOAT,
      `customer_name` STRING,
      PRIMARY KEY (customer_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'ods_customers',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` (
      `order_id` INT,
      `order_revenue` FLOAT,
      `order_region` STRING,
      `customer_id` INT,
      `customer_age` FLOAT,
      `customer_name` STRING,
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'dwd_order_customer_valid',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` (
      `order_region` VARCHAR(40),
      `order_cnt` BIGINT,
      `order_total_revenue` FLOAT,
      PRIMARY KEY (order_region) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'dws_agg_by_region',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    
    USE flink_cdc;
    
    BEGIN STATEMENT SET;
    
    
    INSERT INTO ods_order_table SELECT * FROM orders_src;
    
    INSERT INTO ods_customers_table SELECT * FROM customers_src;
    
    INSERT INTO dwd_order_customer_valid
    SELECT
      o.order_id,
      o.order_revenue,
      o.order_region,
      c.customer_id,
      c.customer_age,
      c.customer_name
    FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id
    WHERE c.customer_id <> -1;
    
    INSERT INTO dws_agg_by_region
    SELECT
      order_region,
      count(*) as order_cnt,
      sum(order_revenue) as order_total_revenue
    FROM dwd_order_customer_valid
    GROUP BY order_region;
    
    END;

    The following tables describe the parameters involved in the code:

    • Parameters used to create the orders_src and customers_src tables

      Parameter

      Description

      connector

      Set the value to mysql-cdc.

      hostname

      The internal endpoint of the ApsaraDB RDS for MySQL instance.

      You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com.

      port

      Set the value to 3306.

      username

      The name of the account that is created in Step 1: Create source MySQL tables. In this example, emr_test is used.

      password

      The password of the account that is created in Step 1: Create source MySQL tables. In this example, Yz12**** is used.

      database-name

      The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.

      table-name

      The name of the table that is created in Step 1: Create source MySQL tables.

      • orders_src: In this example, orders is used.

      • customers_src: In this example, customers is used.

    • Parameters to create the ods_order_table, ods_customers_table, dwd_order_customer_valid, and dws_agg_by_region tables

      Parameter

      Description

      connector

      Set the value to upsert-kafka.

      topic

      The name of the topic that you created in Step 2: Create Kafka topics.

      • ods_order_table: In this example, ods_order is used.

      • ods_customers_table: In this example, ods_customers is used.

      • dwd_order_customer_valid: In this example, dwd_order_customer_valid is used.

      • dws_agg_by_region: In this example, dws_agg_by_region is used.

      properties.bootstrap.servers

      The value is in the 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 format.

  6. Run the following command to start the Flink job:

     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

Step 5: Query the database and table information

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  3. Query the database information.

    1. Run the following command to use the database:

      use flink_cdc;
    2. Run the following command to query information about tables:

      show tables;

      The following output is returned:

      +--------------------------+
      | Tables_in_flink_cdc      |
      +--------------------------+
      | customers                |
      | dwd_order_customer_valid |
      | dws_agg_by_region        |
      | orders                   |
      +--------------------------+
      4 rows in set (0.01 sec)

Step 6: Insert data and query the inserted data

  1. Use the test account that you created in Step 1: Create source MySQL tables to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  2. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statements to insert data into the orders and customers tables:

    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  4. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  5. Query data at the operational data store (ODS) layer.

    1. Run the following command to use the database:

      use flink_cdc;
    2. Execute the following statement to query data from the orders table:

      select * from orders;

      The following output is returned:

      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    3. Execute the following statement to query data from the customers table:

      select * from customers;

      The following output is returned:

      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  6. Query data at the DWD layer.

    1. Run the following command to use the database:

      use flink_cdc;
    2. Execute the following statement to query data from the orders table:

      select * from dwd_order_customer_valid;

      The following output is returned:

      +----------+---------------+--------------+-------------+--------------+---------------+
      | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
      +----------+---------------+--------------+-------------+--------------+---------------+
      |        1 |            10 | beijing      |           1 |           22 | emr_test      |
      |        2 |            10 | beijing      |           1 |           22 | emr_test      |
      +----------+---------------+--------------+-------------+--------------+---------------+
      2 rows in set (0.00 sec)
  7. Query data at the DWS layer.

    1. Run the following command to use the database:

      use flink_cdc;
    2. Execute the following statement to query data from the orders table:

      select * from dws_agg_by_region;

      The following output is returned:

      +--------------+-----------+---------------------+
      | order_region | order_cnt | order_total_revenue |
      +--------------+-----------+---------------------+
      | beijing      |         2 |                  20 |
      +--------------+-----------+---------------------+
      1 row in set (0.01 sec)