This topic describes how to build a data warehouse based on StarRocks to collect incremental data in real time.

Prerequisites

Limits

  • The Dataflow cluster, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC) and in the same zone.
  • The Dataflow cluster and StarRocks cluster must be accessible over the Internet.
  • The engine version of the ApsaraDB RDS for MySQL instance is 5.7 or later.

Scenarios

In latency-sensitive business scenarios, data must be immediately processed after it is generated. To reduce latency, you can use Flink to aggregate data from layers such as the details layer and the summary layer and store the aggregation results for data processing. This way, you can process incremental data in real time.

Architecture

The following figure shows the architecture of this solution. Flink-StarRocks
The solution consists of the following steps:
  1. Use Flink to build a real-time data warehouse, cleanse, process, convert, and aggregate data, and then write the aggregation results to Kafka topics.
  2. Use a StarRocks cluster to subscribe to layers of data from the Kafka topics and persistently store the data for subsequent queries and analysis.

Features, advantages, and disadvantages

The solution has the following features:
  • The data to be calculated is cleansed, processed, converted, and aggregated by Flink and persistently stored in a StarRocks cluster through subscription.
  • The aggregation results can be double-written or single-written. If you select double-writing, the results in each layer are written to the Kafka topic in the next layer and sank to the StarRocks cluster in the same layer. If you select single-writing, the results are written only to the Kafka topics, and the StarRocks cluster persistently stores the results by subscribing to layers of data from the Kafka topics. The single-writing method allows you to query the status of data and update data if required.
  • The StarRocks cluster provides tables for upper-layer applications to perform real-time queries.
  • Advantages
    • The calculation is performed in real time. This meets the requirements of latency-sensitive business scenarios.
    • The metrics are easy to correct. The solution is different from traditional incremental computing in that the intermediate status is persistently stored in a StarRocks cluster. This helps improve the flexibility of subsequent analysis operations. If the quality of intermediate data cannot meet your business requirements, you can modify tables to update data.
  • Disadvantages
    • Before you perform real-time incremental computing, we recommend that you master the necessary skills in Flink.
    • The solution is not suitable for scenarios in which data is frequently updated and cannot be accumulated in calculations.
    • The solution is not suitable for scenarios in which resource-consuming and complex operations such as multi-stream JOIN are performed.
  • Scenarios

    The solution is suitable for scenarios in which requirements are not high, operations are simple, data volumes are small, and the calculations are applied in data instrumentation analysis.

Procedure

  1. Step 1: Create source MySQL tables
  2. Step 2: Create topics in Kafka
  3. Step 3: Create tables in the StarRocks cluster and data import jobs
  4. Step 4: Run a Flink job to start a data flow
  5. Step 5: View the database and table information
  6. Step 6: Insert data and query the inserted data

Step 1: Create source MySQL tables

  1. Create a database and an account for testing. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
    After you create the database and account, grant the read and write permissions to the test account.
    Note In this example, the database named flink_cdc and the account named emr_test are created.
  2. Use the test account that you created to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
  3. Execute the following statements to create tables named orders and customers:
    • Create a table named orders.
      CREATE TABLE flink_cdc.orders (
         order_id INT NOT NULL AUTO_INCREMENT,
         order_revenue FLOAT NOT NULL,
         order_region VARCHAR(40) NOT NULL,
         customer_id INT NOT NULL,
         PRIMARY KEY ( order_id )
      );
    • Create a table named customers.
      CREATE TABLE flink_cdc.customers (
         customer_id INT NOT NULL,
         customer_age INT NOT NULL,
         customer_name VARCHAR(40) NOT NULL,
         PRIMARY KEY ( customer_id )
      );

Step 2: Create topics in Kafka

  1. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following command to go to the /bin directory of Kafka:
    cd /opt/apps/FLINK/flink-current/bin
  3. Run the following commands to create topics:
    kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092"
    kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092"
    kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092"
    kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092"
    Note In the preceding sample code, 192.168.**.** is an internal IP address of the Dataflow cluster. You can go to the Nodes tab of the cluster to view the IP address.

Step 3: Create tables in the StarRocks cluster and data import jobs

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following command to connect to the StarRocks cluster:
    mysql -h127.0.0.1 -P 9030 -uroot
  3. Execute the following statement to create a database:
    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  4. Execute the following statements to create tables named customers and orders:
    • Create a table named customers.
      CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
        `customer_id` INT NOT NULL  COMMENT "",
        `customer_age` FLOAT NOT NULL  COMMENT "",
        `customer_name` STRING NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`customer_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
    • Create a table named orders.
      CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
        `order_id` INT NOT NULL  COMMENT "",
        `order_revenue` FLOAT NOT NULL  COMMENT "",
        `order_region` STRING NOT NULL  COMMENT "",
        `customer_id` INT NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`order_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
  5. Execute the following statement to create a table at the data warehouse detail (DWD) layer:
    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`(
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  6. Execute the following statement to create a table at the data warehouse service (DWS) layer:
    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` (
      `order_region` STRING NOT NULL  COMMENT "",
      `order_cnt` INT NOT NULL  COMMENT "",
      `order_total_revenue` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_region`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_region`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  7. Execute the following statements to create data import jobs in Routine Load mode and subscribe to data in Kafka topics:
    CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders
    COLUMNS (order_id, order_revenue, order_region, customer_id)
    PROPERTIES
    (
      "format" = "json",
      "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "ods_order"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers
    COLUMNS (customer_id, customer_age, customer_name)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "ods_customers"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid
    COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "dwd_order_customer_valid"
    );
    
    CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region
    COLUMNS (order_region, order_cnt, order_total_revenue)
    PROPERTIES
    (
        "format" = "json",
        "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]"
    )
    FROM KAFKA
    (
      "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
      "kafka_topic" = "dws_agg_by_region"
    );

Step 4: Run a Flink job to start a data flow

  1. Download the packages of Flink change data capture (CDC) connector and Flink StarRocks connector, and upload the packages to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.
  2. Copy the JAR packages in the /opt/apps/FLINK/flink-current/opt/connectors/kafka directory of the Dataflow cluster to the /opt/apps/FLINK/flink-current/lib directory.
  3. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
  4. Run the following command to start the cluster:
    Important The example in this topic is only for testing purposes. If you want to run a Flink job in the production environment, submit the job based on YARN or Kubernetes. For more information, see Apache Hadoop YARN and Native Kubernetes.
    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. Write code for a Flink SQL job and save the code as the demo.sql file.
    Run the following command to edit the demo.sql file:
    vim demo.sql
    The following example shows the code in the file:
    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    -- Create source tables. 
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`(
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'orders'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    -- Create tables at the operational data store (ODS), DWD, and DWS layers.
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` (
      `order_id` INT,
      `order_revenue` FLOAT,
      `order_region` VARCHAR(40),
      `customer_id` INT,
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'ods_order',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` (
      `customer_id` INT,
      `customer_age` FLOAT,
      `customer_name` STRING,
      PRIMARY KEY (customer_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'ods_customers',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` (
      `order_id` INT,
      `order_revenue` FLOAT,
      `order_region` STRING,
      `customer_id` INT,
      `customer_age` FLOAT,
      `customer_name` STRING,
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'dwd_order_customer_valid',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` (
      `order_region` VARCHAR(40),
      `order_cnt` BIGINT,
      `order_total_revenue` FLOAT,
      PRIMARY KEY (order_region) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'dws_agg_by_region',
      'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    
    USE flink_cdc;
    
    BEGIN STATEMENT SET;
    
    
    INSERT INTO ods_order_table SELECT * FROM orders_src;
    
    INSERT INTO ods_customers_table SELECT * FROM customers_src;
    
    INSERT INTO dwd_order_customer_valid
    SELECT
      o.order_id,
      o.order_revenue,
      o.order_region,
      c.customer_id,
      c.customer_age,
      c.customer_name
    FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id
    WHERE c.customer_id <> -1;
    
    INSERT INTO dws_agg_by_region
    SELECT
      order_region,
      count(*) as order_cnt,
      sum(order_revenue) as order_total_revenue
    FROM dwd_order_customer_valid
    GROUP BY order_region;
    
    END;
    The following tables describe the parameters involved in the code:
    • Create the orders_src and customers_src tables
      ParameterDescription
      connectorSet the value to mysql-cdc.
      hostnameThe internal endpoint of the ApsaraDB RDS for MySQL instance.

      You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com.

      portSet the value to 3306.
      usernameThe name of the account that you created created in Step 1: Create source MySQL tables. In this example, emr_test is used.
      passwordThe password of the account that you created in Step 1: Create source MySQL tables. In this example, Yz12**** is used.
      database-nameThe name of the database that you created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.
      table-nameThe name of the table that you created in Step 1: Create source MySQL tables.
      • orders_src: In this example, orders is used.
      • customers_src: In this example, customers is used.
    • Create the ods_order_table, ods_customers_table, dwd_order_customer_valid, and dws_agg_by_region tables
      ParameterDescription
      connectorSet the value to upsert-kafka.
      topicThe name of the topic that you created in Step 2: Create topics in Kafka.
      • ods_order_table: In this example, ods_order is used.
      • ods_customers_table: In this example, ods_customers is used.
      • dwd_order_customer_valid: In this example, dwd_order_customer_valid is used.
      • dws_agg_by_region: In this example, dws_agg_by_region is used.
      properties.bootstrap.serversThe fixed format is 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092.
  6. Run the following command to start the Flink job:
     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

Step 5: View the database and table information

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following command to connect to the StarRocks cluster:
    mysql -h127.0.0.1 -P 9030 -uroot
  3. Query the database information.
    1. Run the following command to use the flink_cdc database:
      use flink_cdc;
    2. Run the following command to query the information about tables:
      show tables;
      The following output is returned:
      +--------------------------+
      | Tables_in_flink_cdc      |
      +--------------------------+
      | customers                |
      | dwd_order_customer_valid |
      | dws_agg_by_region        |
      | orders                   |
      +--------------------------+
      4 rows in set (0.01 sec)

Step 6: Insert data and query the inserted data

  1. Use the test account that you created in Step 1: Create source MySQL tables to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
  2. In the SQLConsole of the ApsaraDB RDS for MySQL database, execute the following statements to insert data into the orders and customers tables:
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
  4. Run the following command to connect to the StarRocks cluster:
    mysql -h127.0.0.1 -P 9030 -uroot
  5. Query data at the ODS layer.
    1. Run the following command to use the flink_cdc database:
      use flink_cdc;
    2. Execute the following statement to query the information about the orders table:
      select * from orders;
      The following output is returned:
      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    3. Execute the following statement to query the information about the customers table:
      select * from customers;
      The following output is returned:
      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  6. Query data at the DWD layer.
    1. Run the following command to use the flink_cdc database:
      use flink_cdc;
    2. Execute the following statement to query the information about the orders table:
      select * from dwd_order_customer_valid;
      The following output is returned:
      +----------+---------------+--------------+-------------+--------------+---------------+
      | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
      +----------+---------------+--------------+-------------+--------------+---------------+
      |        1 |            10 | beijing      |           1 |           22 | emr_test      |
      |        2 |            10 | beijing      |           1 |           22 | emr_test      |
      +----------+---------------+--------------+-------------+--------------+---------------+
      2 rows in set (0.00 sec)
  7. Query data at the DWS layer.
    1. Run the following command to use the flink_cdc database:
      use flink_cdc;
    2. Execute the following statement to query the information about the orders table:
      select * from dws_agg_by_region;
      The following output is returned:
      +--------------+-----------+---------------------+
      | order_region | order_cnt | order_total_revenue |
      +--------------+-----------+---------------------+
      | beijing      |         2 |                  20 |
      +--------------+-----------+---------------------+
      1 row in set (0.01 sec)