資料清洗功能提供常見的訊息處理模板,包括內容分割、動態路由、內容富化和內容映射等。您可以直接利用模板處理訊息,也可以根據業務情況在模板基礎上修改代碼。
背景資訊
訊息資料清洗任務提供基本的運算元能力,底層邏輯使用Function Compute。支援進行資料清洗的產品包含ApsaraMQ for RocketMQ、ApsaraMQ for Kafka、ApsaraMQ for MQTT、ApsaraMQ for RabbitMQSimple Message Queue (formerly MNS)。資料清洗任務建立完成後,您可以登入Function Compute控制台,進行代碼自訂及相應函數配置的修改。
運算元 | 運算元能力說明 |
內容分割 | 根據Regex對訊息內容進行分割,將分割後的訊息逐條發送至目標。 |
動態路由 | 根據Regex匹配訊息內容,將匹配成功的訊息路由至對應目標,將匹配不成功的訊息路由至預設目標。 |
內容富化 | 根據富化源對訊息內容進行富化。如果訊息原始內容包含AccountID,處理時根據AccountID查詢資料庫,獲得客戶地區後填至源訊息體中,並發送至目標服務。 |
內容映射 | 根據Regex對訊息內容進行映射處理。例如,屏蔽訊息中敏感欄位或將訊息大小縮減至最小標準。 |
本文以雲訊息佇列 Kafka 版為例介紹如何使用資料清洗。
情境樣本
內容分割
例如,以下是一份學生名單。
message:
[張三,男,4班|李四,女,3班|王五,男,4班]需要將訊息拆分為單個學生的資訊,然後分三條訊息推送至各目標服務。如下所示。
message:
[張三,男,4班]
message:
[李四,女,3班]
message:
[王五,男,4班]實現效果如下圖。
動態路由
例如,以下是一份牙膏資訊清單。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]需要按照自訂動態規則,將列表路由至目標Topic。規則描述如下所示。
如果訊息以BrandA開頭,發送至BrandA-item-topic和BrandA-discount-topic這兩個topic。
如果訊息以BrandB開頭,發送至BrandB-item-topic和BrandB-discount-topic這兩個topic。
其餘訊息發送至Unknown-brand-topic。
規則的JSON描述如下。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}實現效果如下圖。
內容富化
本文以一個IP位址區段處理的情境富化為例。假設某服務的訪問日誌如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}需要統計IP地址的來源,並且映射關係儲存於資料庫MySQL。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );處理後的訊息結果如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}實現效果如下圖。
內容映射
例如,以下是某公司員工登記資訊,涉及了員工工號、電話號碼等隱私內容。
張三,工號1,131 1111 1111
李四,工號2,132 2222 2222
王五,工號3,133 3333 3333需要將以上訊息中員工隱私資訊進行屏蔽,然後推送至目標服務。如下所示。
張*,工號*,*** **** ****
李*,工號*,*** **** ****
王*,工號*,*** **** ****實現效果如下圖。
操作步驟
登入ApsaraMQ for Kafka控制台,左側導覽列選擇,然後單擊创建任务。
在建立任務頁面的Source(源)、Filtering(過濾)、Transform(轉換)和Sink(目標)設定精靈分別設定事件來源、過濾規則、資料清洗模板和事件目標。

①Source(源)和④Sink(目標)
選擇ApsaraMQ for Kafka的不同執行個體。
②Filtering(過濾)
可選配置,如果不設定表示匹配全部事件,更多匹配規則,請參見訊息過濾。
③Transform(轉換)
選擇一個Function Compute提供的模板,包括內容分割、內容映射、內容富化和動態路由。您可以根據需求選擇以上模板,模板中提供了基礎的資料處理邏輯,可以直接使用也可以自行調整。
本文以選擇內容分割模板為例。
