This topic describes how to build an ad hoc query solution in data warehouse scenarios by using StarRocks views.
Prerequisites
A Dataflow or custom cluster is created. For more information, see Create a cluster.
A StarRocks cluster is created. For more information, see Create a StarRocks cluster.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
NoteIn this example, the Dataflow cluster is of EMR V3.42.0, the StarRocks cluster is of EMR V5.8.0, and the MySQL version of the ApsaraDB RDS for MySQL instance is 5.7.
Limits
The Dataflow cluster, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC) and in the same zone.
The Dataflow cluster and StarRocks cluster must be accessible over the Internet.
The MySQL version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.
Precautions
This topic is provided for testing purposes only. To run Flink jobs in a production environment, use Alibaba Cloud Realtime Compute for Apache Flink to configure the Flink jobs, or use YARN or Kubernetes to submit the jobs.
For more information, see Apache Hadoop YARN and Native Kubernetes.
Overview
With the application of technologies such as vectorization, cost-based optimizer (CBO), and single-node multi-core scheduling, the computing power of StarRocks is improved. When you use StarRocks to apply layered modeling in a data warehouse, most data is written to the data warehouse detail (DWD) or data warehouse summary (DWS) layer. In actual business scenarios, you can use StarRocks to query data at the DWD or DWS layer. You can also use StarRocks to perform ad hoc queries in a flexible and interactive manner.
Architecture
The solution consists of the following steps:
Use Flink to cleanse the logs that are imported to Kafka, or use the Flink change data capture (CDC) connector and Flink StarRocks connector to obtain binary logs from MySQL and import the binary logs to StarRocks. You can select data models based on your business requirements, and write data to the operational data store (ODS) layer. Data models include the detailed model, aggregation model, update model, and primary key model.
You can use StarRocks views to transfer data from the ODS layer to the DWD and DWS layers. You can use StarRocks vectorization and CBO for complex SQL statements, such as multi-table joins and nested subqueries. Metrics are calculated on site during a query to ensure that the roll-up and drill-down depths of metrics are the same as those of the source data.
Features
Computing is performed in StarRocks. The solution is suitable for scenarios in which data is frequently updated in business systems. Entity data is stored only at the ODS or DWD layer.
Benefits
You can adjust views based on your business logic in a flexible manner.
You can modify metrics with ease. The DWD and DWS layers are encapsulated based on the logic of views. You need to only update the source table data.
Disadvantages
The query performance is affected by the logic of views and data volume. If the logic of views is complex and the data volume is large, the query performance is low.
Scenarios
Data originates from databases and event tracking systems, high flexibility is required, the capability of processing a large number of queries per second (QPS) is not required, and the computing resources are sufficient.
High real-time performance is required. Data needs to be available for queries immediately after data is written to StarRocks, and data updates need to be synchronized to StarRocks in real time. Ad hoc queries are required, resources are sufficient, and query complexity is low.
Procedure
Perform the following steps:
Step 1: Create source MySQL tables
Create a test database and a test account. For information about how to create an account and a database, see Create accounts and databases.
After you create the test database and account, grant the read and write permissions on the database to the account.
NoteIn this example, a database named flink_cdc and an account named emr_test are created.
Use the test account to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
Execute the following statement to create a table named orders:
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY (order_id) );Execute the following statement to create a table named customers:
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY (customer_id) );
Step 2: Create tables in the StarRocks cluster
Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster:
mysql -h127.0.0.1 -P 9030 -urootExecute the following statement to create a database:
CREATE DATABASE IF NOT EXISTS `flink_cdc`;Execute the following statement to create a table named customers:
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );Execute the following statement to create a table named orders:
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );Execute the following statement to create a DWD view based on the ODS table:
CREATE VIEW flink_cdc.dwd_order_customer_valid ( order_id, order_revenue, order_region, customer_id, customer_age, customer_name ) AS SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM flink_cdc.customers c JOIN flink_cdc.orders o ON c.customer_id=o.customer_id WHERE c.customer_id != -1;Execute the following statement to create a DWS view based on the DWD table:
CREATE VIEW flink_cdc.dws_agg_by_region ( order_region, order_cnt, order_total_revenue) AS SELECT order_region, count(order_region), sum(order_revenue) FROM flink_cdc.dwd_order_customer_valid GROUP BY order_region;
Step 3: Run a Flink job to start a data stream
Download the packages of the CDC connector and Flink StarRocks connector, and upload the packages to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.
Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
Add a port and modify the number of slots for parallel job execution.
Run the following command to open the flink-conf.yaml file:
vim /etc/taihao-apps/flink-conf/flink-conf.yamlAdd the following content to the end of the file:
rest.port: 8083Change the value of the
taskmanager.numberOfTaskSlotsparameter in the file to 3.NoteThe default value of the
taskmanager.numberOfTaskSlotsparameter is 1. This indicates that only one job can run on TaskManager. In the subsequent operations, the demo.sql file contains two jobs. To ensure that the jobs can run in parallel on TaskManager, we recommend that you set thetaskmanager.numberOfTaskSlotsparameter to 2 at least.
Run the following command to start the Dataflow cluster.
ImportantThe example in this topic is provided for testing purposes only. To run Flink jobs in a production environment, use YARN or Kubernetes to submit the jobs. For more information, see Apache Hadoop YARN and Native Kubernetes.
/opt/apps/FLINK/flink-current/bin/start-cluster.shWrite code for Flink SQL jobs and save the code as the demo.sql file.
Run the following command to open the demo.sql file. Edit the file based on your business requirements.
vim demo.sqlThe following example shows the code in the file:
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'load-url' = '10.0.**.**:8030', 'database-name' = 'flink_cdc', 'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030', 'sink.buffer-flush.interval-ms' = '15000', 'sink.properties.format' = 'json', 'username' = 'root', 'table-name' = 'customers', 'sink.properties.strip_outer_array' = 'true', 'password' = '', 'sink.max-retries' = '10', 'connector' = 'starrocks' ); INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`; CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` ( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'database-name' = 'flink_cdc', 'table-name' = 'orders', 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` ( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'sink.properties.strip_outer_array' = 'true', 'password' = '', 'sink.max-retries' = '10', 'connector' = 'starrocks', 'table-name' = 'orders', 'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030', 'sink.buffer-flush.interval-ms' = '15000', 'sink.properties.format' = 'json', 'username' = 'root', 'load-url' = '10.0.**.**:8030', 'database-name' = 'flink_cdc' ); INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;The following tables describe the parameters involved in the code.
Parameters used to create the customers_src table
Parameter
Description
connector
Set the value to mysql-cdc.
hostname
The internal endpoint of the ApsaraDB RDS for MySQL instance.
You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-2ze8398257383****.mysql.rds.aliyuncs.com.
port
Set the value to 3306.
username
The name of the account that is created in Step 1: Create source MySQL tables. In this example, emr_test is used.
password
The password of the account that is created in Step 1: Create source MySQL tables. In this example, Yz12**** is used.
database-name
The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.
table-name
The name of the table that is created in Step 1: Create source MySQL tables. In this example, customers is used.
Parameters used to create the customers_sink and orders_sink tables
Parameter
Description
load-url
The IP address and HTTP port of the frontend, in the format of
Internal IP address of the StarRocks cluster:8030. In this example, port 8030 is used. Select a port based on the version of your cluster.18030: Select this port for clusters of EMR V5.9.0 or a later minor version and clusters of EMR V3.43.0 or a later minor version.
8030: Select this port for clusters of EMR V5.8.0, EMR V3.42.0, or a minor version earlier than EMR V5.8.0 or EMR V3.42.0.
NoteFor more information, see Access the UI and ports.
database-name
The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.
jdbc-url
The Java Database Connectivity (JDBC) URL that is used to connect to StarRocks and perform queries in StarRocks.
Example: jdbc:mysql://10.0.**.**:9030. In this example, 10.0.**.** is the internal IP address of the StarRocks cluster.
username
The username that is used to connect to the StarRocks cluster. Set the value to root.
table-name
The name of the table. In this example, set the value to customers.
connector
The type of the connector. Set the value to starrocks.
Run the following command to start the Flink job:
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
Step 4: Test the ad hoc query solution
Use the test account that you created in Step 1: Create source MySQL tables to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statements to insert data into the orders and customers tables:
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster:
mysql -h127.0.0.1 -P 9030 -urootQuery data at the ODS layer.
Execute the following statement to query data from the orders table:
SELECT * FROM flink_cdc.orders;The following output is returned:
+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+Execute the following statement to query data from the customers table:
SELECT * FROM flink_cdc.customers;The following output is returned:
+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
Execute the following statement to query data at the DWD layer:
SELECT * FROM flink_cdc.dwd_order_customer_valid;The following output is returned:
+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)Execute the following statement to query data at the DWS layer:
SELECT * FROM flink_cdc.dws_agg_by_region;The following output is returned:
+--------------+-----------+---------------------+ | order_region | order_cnt | order_total_revenue | +--------------+-----------+---------------------+ | beijing | 2 | 20 | +--------------+-----------+---------------------+ 1 row in set (0.01 sec)