流引擎支援通過ETL SQL進行表層級的即時預計算,提供即時的類物化視圖能力。本文介紹如何使用ETL SQL對寬表資料進行即時同步與即時預計算。
前提條件
已開通流引擎。
已開通寬表引擎。
情境一:即時鏡像表
說明
即時同步源表資料,可用於資料共用、資料備份、讀寫分離、異構索引等多種業務情境。
資料準備
建立源表和鏡像表。
-- 建立源表 CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1)); -- 建立鏡像表 CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
提交ETL
CREATE ETL sync_etl AS INSERT INTO sink SELECT * FROM source;
資料驗證
向源表中插入資料。
INSERT INTO source(p1, c1) VALUES(0, 0.0); INSERT INTO source(p1, c1) VALUES(1, 1.0); INSERT INTO source(p1, c1) VALUES(2, 2.0); INSERT INTO source(p1, c1) VALUES(3, 3.0); INSERT INTO source(p1, c1) VALUES(4, 4.0);查詢鏡像表中的資料。
SELECT * FROM sink;返回結果:
+------+------+ | p1 | c1 | +------+------+ | 0 | 0.0 | | 1 | 1.0 | | 2 | 2.0 | | 3 | 3.0 | | 4 | 4.0 | +------+------+
情境二:即時預計算
說明
流引擎提供豐富的預計算能力。此處以多表預JOIN(提前對多張表執行資料合併)為例,對多張表的資料進行預打寬處理,避免每次查詢都需要執行JOIN操作,導致回應時間變長、計算開銷過大的問題。
資料準備
建立兩張源表:
user_tbl(使用者資訊)和order_tbl(使用者訂單資訊),並為order_tbl表建立二級索引idx1。-- 建立源表1:user_tbl CREATE TABLE `user_tbl` ( `user_id` varchar NOT NULL, `user_name` varchar, `user_addr` varchar, PRIMARY KEY (`user_id`) ); -- 建立源表2:order_tbl CREATE TABLE `order_tbl` ( `order_id` varchar NOT NULL, `user_id` varchar, `product_name` varchar, `price` decimal(38, 20), PRIMARY KEY (`order_id`) ); -- 為源表order_tbl建立索引idx1 CREATE INDEX idx1 ON `order_tbl`(user_id desc) WITH (COMPRESSION='ZSTD');建立結果表
user_order_tbl,用於儲存資料整合後的結果。CREATE TABLE `user_order_tbl` ( `order_id` varchar NOT NULL, `user_id` varchar, `product_name` varchar, `price` decimal(38, 20), `user_name` varchar, `user_addr` varchar, `user_addr_code` varchar, PRIMARY KEY (`order_id`) ) WITH (MUTABILITY = 'MUTABLE_UDT');
提交ETL
建立ETL,添加資料整合邏輯。
具體如下:
通過
user_id欄位關聯源表order_tbl和user_tbl,將使用者資訊和其訂單資料關聯。使用Regex
REGEXP_EXTRACT提取user_addr中的地址代碼。建立ETL,將整合後的資料插入到目標表
user_order_tbl中,整理為一個同時包含使用者資訊及其訂單資料的表。
CREATE ETL join_etl AS INSERT INTO `lindorm_table`.`default`.`user_order_tbl` ( order_id, user_id, product_name, price, user_name, user_addr, user_addr_code ) SELECT o.order_id, o.user_id, o.product_name, o.price, u.user_name, u.user_addr, REGEXP_EXTRACT(u.user_addr, '#(.*?)$', 1) AS user_addr_code FROM `lindorm_table`.`default`.`order_tbl` o JOIN `lindorm_table`.`default`.`user_tbl` u ON o.user_id = u.user_id;
資料驗證
向源表中插入資料。
INSERT INTO user_tbl (user_id, user_name, user_addr) VALUES ('U001', '張三', '北京市朝陽區#100000'), ('U002', '李四', '上海市浦東新區#200000'), ('U003', '王五', '廣州市天河區#510000'), ('U004', '趙六', '深圳市南山區#518000'); INSERT INTO order_tbl (order_id, user_id, product_name, price) VALUES ('O1001', 'U001', '膝上型電腦', 8999.00), ('O1002', 'U001', '無線滑鼠', 159.00), ('O1003', 'U002', '智能手機', 6999.00), ('O1004', 'U002', '藍芽耳機', 299.00), ('O1005', 'U003', '平板電腦', 3499.00), ('O1006', 'U004', '機械鍵盤', 799.00), ('O1007', 'U004', '顯示器', 1299.00);查詢結果表。ETL會根據資料整合邏輯篩選資料,並將處理結果即時寫入結果表中。
SELECT * FROM user_order_tbl;返回結果:
+----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+ | order_id | user_id | product_name | price | user_name | user_addr | user_addr_code | +----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+ | O1001 | U001 | 膝上型電腦 | 8999.00000000000000000000 | 張三 | 北京市朝陽區#100000 | 100000 | | O1002 | U001 | 無線滑鼠 | 159.00000000000000000000 | 張三 | 北京市朝陽區#100000 | 100000 | | O1003 | U002 | 智能手機 | 6999.00000000000000000000 | 李四 | 上海市浦東新區#200000 | 200000 | | O1004 | U002 | 藍芽耳機 | 299.00000000000000000000 | 李四 | 上海市浦東新區#200000 | 200000 | | O1005 | U003 | 平板電腦 | 3499.00000000000000000000 | 王五 | 廣州市天河區#510000 | 510000 | | O1006 | U004 | 機械鍵盤 | 799.00000000000000000000 | 趙六 | 深圳市南山區#518000 | 518000 | | O1007 | U004 | 顯示器 | 1299.00000000000000000000 | 趙六 | 深圳市南山區#518000 | 518000 | +----------+---------+-----------------+---------------------------+-----------+------------------------------+----------------+