全部產品
Search
文件中心

Function Compute:使用Function Compute對RocketMQ的訊息資料進行清洗

更新時間:Jul 10, 2025

您可以使用Function Compute提供的資料清洗模板處理訊息資料,也可以根據業務情況在模板基礎上修改代碼滿足自訂清洗需求。本文以對ApsaraMQ for RocketMQ中的資料進行內容分割為例,介紹訊息處理模板的類型和使用方式。

功能介紹

訊息資料清洗任務提供基本的運算元能力,底層邏輯使用Function Compute。ApsaraMQ for RocketMQ訊息資料清洗任務建立完成後,您可以登入Function Compute,進行代碼自訂及相應函數配置的修改。

運算元

運算元能力說明

訊息過濾

按照Regex匹配訊息內容,將匹配成功的訊息發送至目標。更多資訊,請參見事件模式

訊息轉換

根據字串匹配,進行訊息內容替換,例如字元大小寫轉換。將轉換後的訊息發送至目標。更多資訊,請參見事件內容轉換

內容分割

根據Regex對訊息內容進行分割,將分割後的訊息逐條發送至目標。

動態路由

根據Regex匹配訊息內容,將匹配成功的訊息路由至對應目標,將匹配不成功的訊息路由至預設目標。

內容富化

根據富化源對訊息內容進行富化。例如,訊息原始內容包含AccountID,處理時根據AccountID查詢資料庫,獲得客戶地區後填至源訊息體中,並發送至目標服務。

內容映射

根據Regex對訊息內容進行映射處理。例如,屏蔽訊息中敏感欄位或將訊息大小縮減至最小標準。

情境樣本

內容分割

例如,需要將原始訊息學生名單[張三,男,4班|李四,女,3班|王五,男,4班]拆分為三條獨立訊息,然後分三條訊息推送至各目標服務。在實現上,可以使用內容分割運算元,分割後的訊息如下所示:

message:
    [張三,男,4班]
message:
    [李四,女,3班]
message:
    [王五,男,4班]

實現效果如下圖。

動態路由

例如,以下是一份牙膏資訊清單。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

需要按照自訂動態規則,將列表路由至目標Topic。規則描述如下所示。

  • 如果訊息以BrandA開頭,發送至BrandA-item-topic和BrandA-discount-topic這兩個topic。

  • 如果訊息以BrandB開頭,發送至BrandB-item-topic和BrandB-discount-topic這兩個topic。

  • 其餘訊息發送至Unknown-brand-topic。

規則的JSON描述如下。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

實現效果如下圖。

內容富化

本文以一個IP位址區段處理的情境富化為例。假設某服務的訪問日誌如下所示。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

需要統計IP地址的來源,並且映射關係儲存於資料庫MySQL。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

處理後的訊息結果如下所示。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

實現效果如下圖。

內容映射

例如,以下是某公司員工登記資訊,涉及了員工工號、電話號碼等隱私內容。

張三,工號1,131 1111 1111
李四,工號2,132 2222 2222
王五,工號3,133 3333 3333

需要將以上訊息中員工隱私資訊進行屏蔽,然後推送至目標服務。如下所示。

張*,工號*,*** **** ****
李*,工號*,*** **** ****
王*,工號*,*** **** ****

實現效果如下圖。

操作步驟

1.建立ApsaraMQ for RocketMQ執行個體Topic

  1. 登入訊息佇列 RocketMQ 版控制台,在左側導覽列,選擇執行個體列表,在上方功能表列,選擇地區,然後單擊建立執行個體

  2. 建立 RocketMQ 執行個體面板,選擇執行個體版本,例如4.0系列執行個體類型選擇標準版執行個體,輸入執行個體名稱如test,輸入執行個體描述,然後單擊確定

  3. 執行個體列表頁面,單擊目標執行個體,在執行個體詳情頁面的左側導覽列,選擇Topic管理,然後單擊建立Topic

  4. 建立Topic面板,設定Topic名稱,例如source-topictarget-topic,輸入描述訊息類型選擇普通訊息,然後單擊確定

    說明

    至少需要建立2個Topic,一個作為事件來源發送原始訊息,另一個作為事件目標接收清洗後的資料。兩個Topic可以屬於同一RocketMQ執行個體,也可以屬於不同的RocketMQ執行個體。

2.建立事件流

  1. 登入事件匯流排 EventBridge控制台,左側導覽列,選擇事件流,在上方功能表列,選擇地區,然後單擊建立事件流

  2. 建立事件流頁面的Source(源)Filtering(過濾)Transform(轉換)Sink(目標)設定精靈分別設定事件來源、過濾規則、資料清洗模板和事件目標,然後單擊儲存

    image

    Source(源)和④Sink(目標)

    • Source(源)選擇ApsaraMQ for RocketMQ執行個體test,Topic選擇source-topic

    • Sink(目標)選擇ApsaraMQ for RocketMQ執行個體test,Topic選擇target-topic

    其餘配置項保持預設值即可。

    Filtering(過濾)

    可選配置,保持預設值,然後單擊下一步

    Transform(轉換)

    • 選擇阿里雲服務:Function Compute。

    • 選擇建立函數模板:建立事件流的同時將建立一個FC函數EventStreaming_Transform_Customized_****

    • 函數模板:本文以選擇內容分割模板為例。包括內容分割內容映射內容富化動態路由。您可以根據需求選擇以上模板,模板中提供了基礎的資料處理邏輯,可以直接使用也可以自行調整。

    image

3.測實驗證

3.1 在源RocketMQ執行個體Topic發送原始訊息

  1. 登入訊息佇列 RocketMQ 版控制台,找到建立事件流時配置的Source(源)執行個體的Topic source-topic,在其右側操作列,單擊快速體驗

  2. 快速體驗的訊息生產和消費面板,輸入原始訊息[張三,男,4班|李四,女,3班|王五,男,4班],單擊確定發送訊息。

3.2 在目標RocketMQ執行個體Topic確認訊息被正確分割

  1. 訊息佇列 RocketMQ 版控制台,找到建立事件流時配置的Sink(目標)執行個體Topic target-topic,單擊該Topic名稱,選擇訊息查詢頁簽。

  2. 查詢方式選擇按 Topic 查詢,然後單擊查詢。查詢結果中,您可以看到上一步發送的訊息發送到目標端時已被分割為3條訊息,依次單擊訊息行的詳情,可以看到3條訊息分別為 "data": "張三,男,4班" "data": "李四,女,3班""data": "王五,男,4班"

    image

4.資源清理

測試完成後,如果短期內不再需要使用該功能,請及時釋放已建立的資源,以免產生不必要的費用。具體操作,請參見刪除Topic刪除RocketMQ執行個體刪除函數