全部產品
Search
文件中心

Function Compute:使用Function Compute實現訊息資料清洗

更新時間:Jan 16, 2025

訊息資料清洗功能提供常見的訊息處理模板,如訊息分割、動態路由和訊息富化等。您可以直接利用模板處理訊息,也可以根據業務情況在模板基礎上修改代碼。本文介紹雲訊息佇列 RocketMQ 版訊息資料清洗模板的類型和使用方式。

背景資訊

訊息資料清洗任務提供基本的運算元能力,底層邏輯使用Function Compute。雲訊息佇列 RocketMQ 版訊息資料清洗任務建立完成後,您可以登入Function Compute控制台,進行代碼自訂及相應函數配置的修改。

運算元

運算元能力說明

訊息過濾

按照Regex匹配訊息內容,將匹配成功的訊息發送至目標。更多資訊,請參見事件模式

訊息轉換

根據字串匹配,進行訊息內容替換,例如字元大小寫轉換。將轉換後的訊息發送至目標。更多資訊,請參見事件內容轉換

內容分割

根據Regex對訊息內容進行分割,將分割後的訊息逐條發送至目標。

動態路由

根據Regex匹配訊息內容,將匹配成功的訊息路由至對應目標,將匹配不成功的訊息路由至預設目標。

內容富化

根據富化源對訊息內容進行富化。如果訊息原始內容包含AccountID,處理時根據AccountID查詢資料庫,獲得客戶地區後填至源訊息體中,並發送至目標服務。

內容映射

根據Regex對訊息內容進行映射處理。例如,屏蔽訊息中敏感欄位或將訊息大小縮減至最小標準。

內容分割

使用樣本

例如,以下是一份學生名單。

message:
[張三,男,17,4班;李四,女,17,3班;王五,男,17,4班]

需要將訊息拆分為單個學生的資訊,然後分三條訊息推送至各目標服務。如下所示。

message:
    [張三,男,17,4班]
message:
    [李四,女,17,3班]
message:
    [王五,男,17,4班]

操作步驟

  1. 登入雲訊息佇列 RocketMQ 版控制台

  2. 在左側導覽列,選擇訊息整合 > 訊息流程出,然後在頂部功能表列,選擇地區。

  3. 訊息流程出(sink)頁面,單擊建立任務

  4. 訊息流程出建立面板,設定以下配置項,然後單擊確定

    關鍵配置項說明如下,其餘保持預設值即可。

    • 基礎資訊

      配置項

      說明

      任務名稱

      填寫任務名稱。

      流出類型

      本文選擇訊息佇列 RocketMQ。支援的Message Service包括訊息佇列 RocketMQ訊息佇列 RabbitMQ輕量訊息佇列(原 MNS)訊息佇列 Kafka

    • 資源配置

      配置項

      說明

      地區

      本文選擇華東1(杭州)

      版本

      選擇RocketMQ的版本,本文選擇RocketMQ 5.x

      RocketMQ 執行個體

      選擇生產訊息的RocketMQ執行個體。

      Topic

      選擇源執行個體的Topic。

      目標

      版本

      選擇接收訊息的RocketMQ的版本,本文選擇RocketMQ 5.x

      執行個體 ID

      選擇接收訊息的RocketMQ的執行個體 ID。

      Topic

      選擇目標執行個體的Topic。

    • 資料處理

      • 訊息過濾:選擇無需過濾

      • 訊息轉換:選擇自訂配置訊息體(body)選擇資料清洗,並選中建立函數模板函數模板選擇內容分割 transform_split,然後根據業務情況修改函數代碼。

    建立完成後,您可以登入Function Compute控制台查看自動建立的服務函數

動態路由

使用樣本

例如,以下是一份牙膏資訊清單。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

需要按照自訂動態規則,將列表路由至目標Topic。規則描述如下所示。

  • 如果訊息以BrandA開頭,發送至BrandA-item-topic和BrandA-discount-topic這兩個topic。

  • 如果訊息以BrandB開頭,發送至BrandB-item-topic和BrandB-discount-topic這兩個topic。

  • 其餘訊息發送至Unknown-brand-topic。

規則的JSON描述如下。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

操作步驟

具體操作,請參見訊息分割操作步驟。其中,函數模板需選擇為動態路由 dynamic_routing

內容富化

使用樣本

本文以一個IP位址區段處理的情境富化為例。假設某服務的訪問日誌如下所示。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

需要統計IP地址的來源,並且映射關係儲存於MySQL資料庫。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

處理後的訊息結果如下所示。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

操作步驟

具體操作,請參見訊息分割操作步驟。其中,函數模板需選擇為內容富化 transform_enrichment

內容映射

使用樣本

例如,以下是某公司員工登記資訊,涉及了員工工號、電話號碼等隱私內容。

張三,工號1,131 1111 1111
李四,工號2,132 2222 2222
王五,工號3,133 3333 3333

需要將以上訊息中員工隱私資訊進行屏蔽,然後推送至目標服務。如下所示。

張*,工號*,*** **** ****
李*,工號*,*** **** ****
王*,工號*,*** **** ****

操作步驟

具體操作,請參見訊息分割操作步驟。其中,函數模板需選擇為內容映射 transform_projection