本文通過樣本為您介紹如何基於StarRocks構建數倉情境-增量資料即時統計。
前提條件
- 已建立DataFlow或自訂叢集,具體操作請參見建立叢集。
- 已建立EMR Serverless StarRocks執行個體,具體操作請參見建立執行個體。
- 已建立RDS MySQL,具體操作請參見快速建立RDS MySQL執行個體。說明 本文樣本中DataFlow叢集為EMR-3.40.0版本、MySQL為5.7版本。
使用限制
- DataFlow叢集、StarRocks叢集和RDS MySQL執行個體需要在同一個VPC下,並且在同一個可用性區域下。
- DataFlow叢集和StarRocks叢集均須開啟公網訪問。
- RDS MySQL為5.7及以上版本。
情境介紹
因為部分情境對資料延遲非常敏感,資料產生的時候必須完成加工,所以此時您可以通過增量資料即時統計的方式,提前使用Flink將明細層、匯總層等層資料進行匯聚,匯聚之後把結果集儲存下來再對外提供服務。
方案架構
增量資料即時統計的基本架構如下圖所示。
整體資料流如下:
直接使用Flink構建即時數倉,由Flink進行清洗加工轉換和摘要彙總,將各層結果集寫入Kafka中。
StarRocks從Kafka分別訂閱各層資料,將各層資料持久化到StarRocks中,用於之後的查詢分析。
方案特點
該方案主要特點如下:
增量計算的資料由Flink進行清洗加工轉換和摘要彙總,各層應用資料通過Kafka分別持久化到StarRocks中。
Flink加工的結果集可以採取雙寫的方式,一方面繼續投遞給下一層訊息流程Topic,一方面Sink到同層的StarRocks中;也可以採用單寫Kafka再通過StarRocks即時消費Kafka對應Topic上的資料,方便後續歷史資料的狀態檢查與重新整理。
StarRocks通過表的形式直接對接上層應用,實現應用即時查詢。
方案優勢
即時性強,能滿足業務對即時性敏感的情境。
指標修正簡單,與傳統增量計算方式不一樣的是,該方案將中間的狀態也持久儲存在StarRocks中,提升了後續分析的靈活性,當中間資料品質有問題時,直接對錶修正,重刷資料即可。
方案缺點
大部分即時增量計算依賴於Flink,需要使用者有一定的Flink技能。
不適合資料頻繁更新,無法進行累加計算的情境。
不適合多流Join等計算複雜資源開銷大的情境。
適用情境
即時需求簡單,資料量不大,以埋點資料統計為主的資料,即時性最強。
操作流程
樣本操作如下:
步驟一:建立MySQL來源資料表
- 建立測試的資料庫和帳號,具體操作請參見建立資料庫和帳號。建立完資料庫和帳號後,需要授權測試帳號的讀寫權限。說明 本文樣本中建立的資料庫名稱為flink_cdc,帳號為emr_test。
- 使用建立的測試帳號串連MySQL執行個體,具體操作請參見通過DMS登入RDS MySQL。
- 執行以下命令,建立資料表。
CREATE DATABASE IF NOT EXISTS flink_cdc; CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY ( order_id ) ); CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY ( customer_id ) );
步驟二:建立Kafka的Topic
- 使用SSH方式登入DataFlow叢集,具體操作請參見登入叢集。
- 執行以下命令,建立對應的Topic。
cd $KAFKA_HOME kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092" kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092" kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092" kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092"說明 本文程式碼範例中的192.168.**.**為DataFlow叢集的內網IP地址,您可以在E-MapReduce控制台DataFlow叢集的節點管理頁簽查看。
步驟三:建立StarRocks表和匯入任務
- 串連EMR Serverless StarRocks執行個體,詳情請參見通過用戶端方式串連StarRocks執行個體。
- 執行以下命令,建立ODS表。
CREATE DATABASE IF NOT EXISTS `flink_cdc`; CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" ); CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" ); - 執行以下命令,建立DWD表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" ); - 執行以下命令,建立DWS表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` ( `order_region` STRING NOT NULL COMMENT "", `order_cnt` INT NOT NULL COMMENT "", `order_total_revenue` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_region`) COMMENT "" DISTRIBUTED BY HASH(`order_region`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" ); - 執行以下命令,建立Routine Load匯入任務,訂閱Kafka資料來源的資料。
CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders COLUMNS (order_id, order_revenue, order_region, customer_id) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_order" ); CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers COLUMNS (customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_customers" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dwd_order_customer_valid" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region COLUMNS (order_region, order_cnt, order_total_revenue) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dws_agg_by_region" );
步驟四:執行Flink任務,啟動資料流
- 下載Flink CDC connector和Flink StarRocks Connector,並上傳至DataFlow叢集的/opt/apps/FLINK/flink-current/lib目錄下。
- 拷貝DataFlow叢集的/opt/apps/FLINK/flink-current/opt/connectors/kafka目錄下的JAR包至/opt/apps/FLINK/flink-current/lib目錄下。
- 使用SSH方式登入DataFlow叢集,具體操作請參見登入叢集。
- 執行以下命令,啟動叢集。重要 本文樣本僅供測試,如果是生產層級的Flink作業請使用YARN或Kubernetes方式提交,詳情請參見Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh - 編寫Flink SQL作業,並儲存為demo.sql。執行以下命令,編輯demo.sql檔案。
vim demo.sql檔案內容如下所示。CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; -- create source tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'orders' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); -- create ods dwd and dws tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` ( `order_id` INT, `order_revenue` FLOAT, `order_region` VARCHAR(40), `customer_id` INT, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_order', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` ( `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (customer_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_customers', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` ( `order_id` INT, `order_revenue` FLOAT, `order_region` STRING, `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_order_customer_valid', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` ( `order_region` VARCHAR(40), `order_cnt` BIGINT, `order_total_revenue` FLOAT, PRIMARY KEY (order_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dws_agg_by_region', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); USE flink_cdc; BEGIN STATEMENT SET; INSERT INTO ods_order_table SELECT * FROM orders_src; INSERT INTO ods_customers_table SELECT * FROM customers_src; INSERT INTO dwd_order_customer_valid SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id WHERE c.customer_id <> -1; INSERT INTO dws_agg_by_region SELECT order_region, count(*) as order_cnt, sum(order_revenue) as order_total_revenue FROM dwd_order_customer_valid GROUP BY order_region; END;涉及參數如下所示:- 建立資料表orders_src和customers_src。
參數 描述 connector 固定值為mysql-cdc。 hostname RDS的內網地址。 您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port 固定值為3306。 username 步驟一:建立MySQL來源資料表中建立的帳號名。本樣本為emr_test。 password 步驟一:建立MySQL來源資料表中建立的帳號的密碼。本樣本為Yz12****。 database-name 步驟一:建立MySQL來源資料表中建立的資料庫名。本樣本為flink_cdc。 table-name 步驟一:建立MySQL來源資料表中建立的資料表。 - orders_src:本樣本為orders。
- customers_src:本樣本為customers。
- 建立資料表ods_order_table、ods_customers_table、dwd_order_customer_valid和dws_agg_by_region。
參數 描述 connector 固定值為upsert-kafka。 topic 步驟二:建立Kafka的Topic中建立的Topic名稱。 - ods_order_table:本樣本為ods_order。
- ods_customers_table:本樣本為ods_customers。
- dwd_order_customer_valid:本樣本為dwd_order_customer_valid。
- dws_agg_by_region:本樣本為dws_agg_by_region。
properties.bootstrap.servers 固定格式為 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092。
- 建立資料表orders_src和customers_src。
- 執行以下命令,啟動Flink任務。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
步驟五:查看資料庫和表資訊
- 串連EMR Serverless StarRocks執行個體,詳情請參見通過用戶端方式串連StarRocks執行個體。
- 執行以下命令,查詢資料庫資訊。
show databases;返回資訊如下所示。+--------------------+ | Database | +--------------------+ | _statistics_ | | information_schema | | flink_cdc | +--------------------+ 3 rows in set (0.00 sec) - 查詢資料表資訊。
- 執行以下命令,使用資料庫。
use flink_cdc; - 執行以下命令,查看錶資訊。
show tables;返回資訊如下所示。+--------------------------+ | Tables_in_flink_cdc | +--------------------------+ | customers | | dwd_order_customer_valid | | dws_agg_by_region | | orders | +--------------------------+ 4 rows in set (0.01 sec)
- 執行以下命令,使用資料庫。
步驟六:驗證插入後的資料
- 使用步驟一:建立MySQL來源資料表中建立的測試帳號串連MySQL執行個體,具體操作請參見通過DMS登入RDS MySQL。
- 在RDS資料庫視窗執行以下命令,向表orders和customers中插入資料。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test"); - 串連EMR Serverless StarRocks執行個體,詳情請參見通過用戶端方式串連StarRocks執行個體。
- 執行以下命令,查詢ODS層資料。
- 執行以下命令,使用資料庫。
use flink_cdc; - 執行以下命令,查看orders表資訊。
select * from orders;返回資訊如下所示。+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+ - 執行以下命令,查看customers表資訊。
select * from customers;返回資訊如下所示。+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
- 執行以下命令,使用資料庫。
- 執行以下命令,查詢DWD層資料。
- 執行以下命令,使用資料庫。
use flink_cdc; - 執行以下命令,查看orders表資訊。
select * from dwd_order_customer_valid;返回資訊如下所示。+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)
- 執行以下命令,使用資料庫。