This topic describes how to build a data warehouse based on StarRocks to implement real-time computing of incremental data.
Prerequisites
A Dataflow cluster or a custom cluster that contains the Flink and Kafka services is created. For more information, see Create a cluster.
A StarRocks cluster is created. For more information, see Create a StarRocks cluster.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
NoteIn this example, the Dataflow cluster is of EMR V3.40.0, the StarRocks cluster is of EMR V5.6.0, and the MySQL version of the ApsaraDB RDS for MySQL instance is 5.7.
Limits
The Dataflow cluster, StarRocks cluster, and ApsaraDB RDS for MySQL instance must be deployed in the same virtual private cloud (VPC) and in the same zone.
The Dataflow cluster and StarRocks cluster must be accessible over the Internet.
The MySQL version of the ApsaraDB RDS for MySQL instance must be 5.7 or later.
Overview
In latency-sensitive business scenarios, data must be immediately processed after it is generated. Real-time computing of incremental data can help you reduce the latency. You can use Flink to aggregate data from layers such as the data warehouse detail (DWD) and data warehouse summary (DWS) layers in advance and persist the aggregation results for further use.
Architecture
The following figure shows the architecture of this solution.
The solution consists of the following steps:
Use Flink to build a real-time data warehouse, cleanse, process, convert, and aggregate data, and then write the aggregation results to Kafka topics.
Use a StarRocks cluster to subscribe to layers of data from the Kafka topics and persist the data for subsequent queries and analysis.
Features
The solution provides the following features:
Incremental data is cleansed, processed, converted, and aggregated by Flink and persisted to a StarRocks cluster through subscription.
The aggregation results can be double-written or single-written. If you select the double-writing mode, the results in each layer are written to the Kafka topic in the next layer and sink to the StarRocks cluster in the same layer. If you select the single-writing mode, the results are written only to the Kafka topics, and the StarRocks cluster persists the results in real time by subscribing to layers of data from the Kafka topics. The single-writing mode allows you to query the status of data and update data if required.
The StarRocks cluster provides tables for upper-layer applications to perform real-time queries.
Advantages
Data computing is performed in real time. This meets the requirements of latency-sensitive business scenarios.
The metrics are easy to modify. Different from traditional incremental data computing solutions, this solution persists the intermediate status to a StarRocks cluster. This helps improve the flexibility of subsequent analysis operations. If the quality of intermediate data cannot meet your business requirements, you can modify tables to update data.
Disadvantages
To implement real-time computing of incremental data, you must master the necessary skills in Flink.
The solution is not suitable for scenarios in which data is frequently updated and cannot be accumulated in calculations.
The solution is not suitable for scenarios in which resource-consuming and complex operations such as multi-stream JOIN are performed.
Scenarios
The solution is suitable for scenarios in which you want to perform simple computing operations on a small amount of data in real time. The most common scenario is the processing of event tracking data.
Procedure
Step 1: Create source MySQL tables
Create a test database and a test account. For more information, see Create accounts and databases.
After you create a test database and a test account, grant the read and write permissions to the account.
NoteIn this example, a database named flink_cdc and an account named emr_test are created.
Use the test account to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
Execute the following statements to create tables named orders and customers.
Create a table named orders.
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY ( order_id ) );
Create a table named customers.
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY ( customer_id ) );
Step 2: Create Kafka topics
Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to go to the /bin directory of Kafka:
cd /opt/apps/FLINK/flink-current/bin
Run the following commands to create Kafka topics:
kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092
Step 3: Create tables in the StarRocks cluster and data import jobs
Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster:
mysql -h127.0.0.1 -P 9030 -uroot
Execute the following statement to create a database:
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
Execute the following statements to create tables named customers and orders.
Create a table named customers.
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
Create a table named orders.
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
Execute the following statement to create a table at the DWD layer:
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
Execute the following statement to create a table at the DWS layer:
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` ( `order_region` STRING NOT NULL COMMENT "", `order_cnt` INT NOT NULL COMMENT "", `order_total_revenue` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_region`) COMMENT "" DISTRIBUTED BY HASH(`order_region`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
Execute the following statements to create data import jobs in Routine Load mode and subscribe to data in Kafka topics:
CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders COLUMNS (order_id, order_revenue, order_region, customer_id) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_order" ); CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers COLUMNS (customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_customers" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dwd_order_customer_valid" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region COLUMNS (order_region, order_cnt, order_total_revenue) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dws_agg_by_region" );
Step 4: Run a Flink job to start a data stream
Download the packages of the Flink change data capture (CDC) and Flink StarRocks connectors, and upload the packages to the /opt/apps/FLINK/flink-current/lib directory of the Dataflow cluster.
Copy the JAR packages in the /opt/apps/FLINK/flink-current/opt/connectors/kafka directory of the Dataflow cluster to the /opt/apps/FLINK/flink-current/lib directory.
Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the cluster.
ImportantThe example in this topic is only for testing purposes. To run Flink jobs in a production environment, use YARN or Kubernetes to submit the jobs. For more information, see Apache Hadoop YARN and Native Kubernetes.
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
Write code for a Flink SQL job and save the code as the demo.sql file.
Run the following command to open the demo.sql file. Edit the file based on your business requirements.
vim demo.sql
The following example shows the code in the file:
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; -- Create order source tables. CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'orders' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); -- create ods dwd and dws tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` ( `order_id` INT, `order_revenue` FLOAT, `order_region` VARCHAR(40), `customer_id` INT, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_order', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` ( `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (customer_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_customers', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` ( `order_id` INT, `order_revenue` FLOAT, `order_region` STRING, `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_order_customer_valid', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` ( `order_region` VARCHAR(40), `order_cnt` BIGINT, `order_total_revenue` FLOAT, PRIMARY KEY (order_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dws_agg_by_region', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); USE flink_cdc; BEGIN STATEMENT SET; INSERT INTO ods_order_table SELECT * FROM orders_src; INSERT INTO ods_customers_table SELECT * FROM customers_src; INSERT INTO dwd_order_customer_valid SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id WHERE c.customer_id <> -1; INSERT INTO dws_agg_by_region SELECT order_region, count(*) as order_cnt, sum(order_revenue) as order_total_revenue FROM dwd_order_customer_valid GROUP BY order_region; END;
The following tables describe the parameters involved in the code:
Parameters used to create the orders_src and customers_src tables
Parameter
Description
connector
Set the value to mysql-cdc.
hostname
The internal endpoint of the ApsaraDB RDS for MySQL instance.
You can copy the internal endpoint on the Database Connection page of the ApsaraDB RDS for MySQL instance in the ApsaraDB RDS console. Example: rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com.
port
Set the value to 3306.
username
The name of the account that is created in Step 1: Create source MySQL tables. In this example, emr_test is used.
password
The password of the account that is created in Step 1: Create source MySQL tables. In this example, Yz12**** is used.
database-name
The name of the database that is created in Step 1: Create source MySQL tables. In this example, flink_cdc is used.
table-name
The name of the table that is created in Step 1: Create source MySQL tables.
orders_src: In this example, orders is used.
customers_src: In this example, customers is used.
Parameters to create the ods_order_table, ods_customers_table, dwd_order_customer_valid, and dws_agg_by_region tables
Parameter
Description
connector
Set the value to upsert-kafka.
topic
The name of the topic that you created in Step 2: Create Kafka topics.
ods_order_table: In this example, ods_order is used.
ods_customers_table: In this example, ods_customers is used.
dwd_order_customer_valid: In this example, dwd_order_customer_valid is used.
dws_agg_by_region: In this example, dws_agg_by_region is used.
properties.bootstrap.servers
The value is in the
192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092
format.
Run the following command to start the Flink job:
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
Step 5: Query the database and table information
Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster:
mysql -h127.0.0.1 -P 9030 -uroot
Query the database information.
Run the following command to use the database:
use flink_cdc;
Run the following command to query information about tables:
show tables;
The following output is returned:
+--------------------------+ | Tables_in_flink_cdc | +--------------------------+ | customers | | dwd_order_customer_valid | | dws_agg_by_region | | orders | +--------------------------+ 4 rows in set (0.01 sec)
Step 6: Insert data and query the inserted data
Use the test account that you created in Step 1: Create source MySQL tables to log on to the ApsaraDB RDS for MySQL instance. For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
On the SQL Console tab of the ApsaraDB RDS for MySQL database, execute the following statements to insert data into the orders and customers tables:
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
Log on to the StarRocks cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster:
mysql -h127.0.0.1 -P 9030 -uroot
Query data at the operational data store (ODS) layer.
Run the following command to use the database:
use flink_cdc;
Execute the following statement to query data from the orders table:
select * from orders;
The following output is returned:
+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+
Execute the following statement to query data from the customers table:
select * from customers;
The following output is returned:
+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
Query data at the DWD layer.
Run the following command to use the database:
use flink_cdc;
Execute the following statement to query data from the orders table:
select * from dwd_order_customer_valid;
The following output is returned:
+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)
Query data at the DWS layer.
Run the following command to use the database:
use flink_cdc;
Execute the following statement to query data from the orders table:
select * from dws_agg_by_region;
The following output is returned:
+--------------+-----------+---------------------+ | order_region | order_cnt | order_total_revenue | +--------------+-----------+---------------------+ | beijing | 2 | 20 | +--------------+-----------+---------------------+ 1 row in set (0.01 sec)