All Products
Search
Document Center

E-MapReduce:Data warehouse scenarios: Ad hoc query solution

Last Updated:Jul 29, 2024

This topic describes how to build an ad hoc query solution in data warehouse scenarios by using StarRocks views.

Prerequisites

  • A Dataflow or custom cluster is created. For more information, see Create a cluster.

  • A StarRocks cluster is created. For more information, see Create a StarRocks cluster.

  • An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.

    Note

    In this example, the Dataflow cluster is of EMR V3.42.0, the StarRocks cluster is of EMR V5.8.0, and the MySQL version of the ApsaraDB RDS for MySQL instance is 5.7.

Limits

  • The Dataflow cluster, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC) and in the same zone.

  • The Dataflow cluster and StarRocks cluster must be accessible over the Internet.

  • The MySQL version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.

Precautions

This topic is provided for testing purposes only. To run Flink jobs in a production environment, use Alibaba Cloud Realtime Compute for Apache Flink to configure the Flink jobs, or use YARN or Kubernetes to submit the jobs.

For more information, see Apache Hadoop YARN and Native Kubernetes.

Overview

With the application of technologies such as vectorization, cost-based optimizer (CBO), and single-node multi-core scheduling, the computing power of StarRocks is improved. When you use StarRocks to apply layered modeling in a data warehouse, most data is written to the data warehouse detail (DWD) or data warehouse summary (DWS) layer. In actual business scenarios, you can use StarRocks to query data at the DWD or DWS layer. You can also use StarRocks to perform ad hoc queries in a flexible and interactive manner.

Architecture

The solution consists of the following steps:

  1. Use Flink to cleanse the logs that are imported to Kafka, or use the Flink change data capture (CDC) connector and Flink StarRocks connector to obtain binary logs from MySQL and import the binary logs to StarRocks. You can select data models based on your business requirements, and write data to the operational data store (ODS) layer. Data models include the detailed model, aggregation model, update model, and primary key model.

  2. You can use StarRocks views to transfer data from the ODS layer to the DWD and DWS layers. You can use StarRocks vectorization and CBO for complex SQL statements, such as multi-table joins and nested subqueries. Metrics are calculated on site during a query to ensure that the roll-up and drill-down depths of metrics are the same as those of the source data.

Features

Computing is performed in StarRocks. The solution is suitable for scenarios in which data is frequently updated in business systems. Entity data is stored only at the ODS or DWD layer.

  • Benefits

    • You can adjust views based on your business logic in a flexible manner.

    • You can modify metrics with ease. The DWD and DWS layers are encapsulated based on the logic of views. You need to only update the source table data.

  • Disadvantages

    The query performance is affected by the logic of views and data volume. If the logic of views is complex and the data volume is large, the query performance is low.

  • Scenarios

    • Data originates from databases and event tracking systems, high flexibility is required, the capability of processing a large number of queries per second (QPS) is not required, and the computing resources are sufficient.

    • High real-time performance is required. Data needs to be available for queries immediately after data is written to StarRocks, and data updates need to be synchronized to StarRocks in real time. Ad hoc queries are required, resources are sufficient, and query complexity is low.

Procedure

Perform the following steps:

  1. Step 1: Create source MySQL tables

  2. Step 2: Create tables in the StarRocks cluster

  3. Step 3: Run a Flink job to start a data stream

  4. Step 4: Test the ad hoc query solution

Step 1: Create source MySQL tables

  1. Create a test database and a test account. For information about how to create an account and a database, see Create accounts and databases.

    After you create the test database and account, grant the read and write permissions on the database to the account.

    Note

    In this example, a database named flink_cdc and an account named emr_test are created.

  2. Use the test account to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  3. Execute the following statement to create a table named orders:

    CREATE TABLE flink_cdc.orders (
        order_id INT NOT NULL AUTO_INCREMENT,
        order_revenue FLOAT NOT NULL,
        order_region VARCHAR(40) NOT NULL,
        customer_id INT NOT NULL,
        PRIMARY KEY (order_id)
    );
  4. Execute the following statement to create a table named customers:

    CREATE TABLE flink_cdc.customers (
        customer_id INT NOT NULL,
        customer_age INT NOT NULL,
        customer_name VARCHAR(40) NOT NULL,
        PRIMARY KEY (customer_id)
    );

Step 2: Create tables in the StarRocks cluster

  1. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  3. Execute the following statement to create a database:

    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  4. Execute the following statement to create a table named customers:

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`customer_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  5. Execute the following statement to create a table named orders:

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  6. Execute the following statement to create a DWD view based on the ODS table:

    CREATE VIEW flink_cdc.dwd_order_customer_valid (
      order_id,
      order_revenue,
      order_region,
      customer_id,
      customer_age,
      customer_name
    )
    AS
    SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
    FROM flink_cdc.customers c JOIN flink_cdc.orders o
    ON c.customer_id=o.customer_id
    WHERE c.customer_id != -1;
  7. Execute the following statement to create a DWS view based on the DWD table:

    CREATE VIEW flink_cdc.dws_agg_by_region (
      order_region,
      order_cnt,
      order_total_revenue)
    AS
    SELECT order_region, count(order_region), sum(order_revenue)
    FROM flink_cdc.dwd_order_customer_valid
    GROUP BY order_region;

Step 3: Run a Flink job to start a data stream

  1. Download the packages of the CDC connector and Flink StarRocks connector, and upload the packages to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.

  2. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

  3. Add a port and modify the number of slots for parallel job execution.

    1. Run the following command to open the flink-conf.yaml file:

      vim /etc/taihao-apps/flink-conf/flink-conf.yaml
    2. Add the following content to the end of the file:

      rest.port: 8083
    3. Change the value of the taskmanager.numberOfTaskSlots parameter in the file to 3.

      Note

      The default value of the taskmanager.numberOfTaskSlots parameter is 1. This indicates that only one job can run on TaskManager. In the subsequent operations, the demo.sql file contains two jobs. To ensure that the jobs can run in parallel on TaskManager, we recommend that you set the taskmanager.numberOfTaskSlots parameter to 2 at least.

  4. Run the following command to start the Dataflow cluster.

    Important

    The example in this topic is provided for testing purposes only. To run Flink jobs in a production environment, use YARN or Kubernetes to submit the jobs. For more information, see Apache Hadoop YARN and Native Kubernetes.

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. Write code for Flink SQL jobs and save the code as the demo.sql file.

    Run the following command to open the demo.sql file. Edit the file based on your business requirements.

    vim demo.sql

    The following example shows the code in the file:

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'load-url' = '10.0.**.**:8030',
      'database-name' = 'flink_cdc',
      'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'root',
      'table-name' = 'customers',
      'sink.properties.strip_outer_array' = 'true',
      'password' = '',
      'sink.max-retries' = '10',
      'connector' = 'starrocks'
    );
    INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'database-name' = 'flink_cdc',
      'table-name' = 'orders',
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'sink.properties.strip_outer_array' = 'true',
      'password' = '',
      'sink.max-retries' = '10',
      'connector' = 'starrocks',
      'table-name' = 'orders',
      'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'root',
      'load-url' = '10.0.**.**:8030',
      'database-name' = 'flink_cdc'
    );
    
    INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;

    The following tables describe the parameters involved in the code.

    • Parameters used to create the customers_src table

      Parameter

      Description

      connector

      Set the value to mysql-cdc.

      hostname

      The internal endpoint of the ApsaraDB RDS for MySQL instance.

      You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-2ze8398257383****.mysql.rds.aliyuncs.com.

      port

      Set the value to 3306.

      username

      The name of the account that is created in Step 1: Create source MySQL tables. In this example, emr_test is used.

      password

      The password of the account that is created in Step 1: Create source MySQL tables. In this example, Yz12**** is used.

      database-name

      The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.

      table-name

      The name of the table that is created in Step 1: Create source MySQL tables. In this example, customers is used.

    • Parameters used to create the customers_sink and orders_sink tables

      Parameter

      Description

      load-url

      The IP address and HTTP port of the frontend, in the format of Internal IP address of the StarRocks cluster:8030. In this example, port 8030 is used. Select a port based on the version of your cluster.

      • 18030: Select this port for clusters of EMR V5.9.0 or a later minor version and clusters of EMR V3.43.0 or a later minor version.

      • 8030: Select this port for clusters of EMR V5.8.0, EMR V3.42.0, or a minor version earlier than EMR V5.8.0 or EMR V3.42.0.

      Note

      For more information, see Access the UI and ports.

      database-name

      The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.

      jdbc-url

      The Java Database Connectivity (JDBC) URL that is used to connect to StarRocks and perform queries in StarRocks.

      Example: jdbc:mysql://10.0.**.**:9030. In this example, 10.0.**.** is the internal IP address of the StarRocks cluster.

      username

      The username that is used to connect to the StarRocks cluster. Set the value to root.

      table-name

      The name of the table. In this example, set the value to customers.

      connector

      The type of the connector. Set the value to starrocks.

  6. Run the following command to start the Flink job:

     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

Step 4: Test the ad hoc query solution

  1. Use the test account that you created in Step 1: Create source MySQL tables to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.

  2. On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statements to insert data into the orders and customers tables:

    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.

  4. Run the following command to connect to the StarRocks cluster:

    mysql -h127.0.0.1 -P 9030 -uroot
  5. Query data at the ODS layer.

    1. Execute the following statement to query data from the orders table:

      SELECT * FROM flink_cdc.orders;

      The following output is returned:

      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    2. Execute the following statement to query data from the customers table:

      SELECT * FROM flink_cdc.customers;

      The following output is returned:

      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  6. Execute the following statement to query data at the DWD layer:

    SELECT * FROM flink_cdc.dwd_order_customer_valid;

    The following output is returned:

    +----------+---------------+--------------+-------------+--------------+---------------+
    | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
    +----------+---------------+--------------+-------------+--------------+---------------+
    |        1 |            10 | beijing      |           1 |           22 | emr_test      |
    |        2 |            10 | beijing      |           1 |           22 | emr_test      |
    +----------+---------------+--------------+-------------+--------------+---------------+
    2 rows in set (0.00 sec)
  7. Execute the following statement to query data at the DWS layer:

    SELECT * FROM flink_cdc.dws_agg_by_region;

    The following output is returned:

    +--------------+-----------+---------------------+
    | order_region | order_cnt | order_total_revenue |
    +--------------+-----------+---------------------+
    | beijing      |         2 |                  20 |
    +--------------+-----------+---------------------+
    1 row in set (0.01 sec)