全部產品
Search
文件中心

ApsaraMQ for RocketMQ:訊息過濾

更新時間:Dec 26, 2024

消費者訂閱了某個主題後,雲訊息佇列 RocketMQ 版會將該主題中的所有訊息投遞給消費者。若消費者只需要關注部分訊息,可通過設定過濾條件在雲訊息佇列 RocketMQ 版服務端進行過濾,只擷取到需要關注的訊息子集,避免接收到大量無效的訊息。

應用情境

雲訊息佇列 RocketMQ 版作為發布訂閱模型的訊息中介軟體廣泛應用於上下遊業務整合情境。在實際業務情境中,同一個主題下的訊息往往會被多個不同的下遊業務方處理,各下遊的處理邏輯不同,只關注自身邏輯需要的訊息子集。

使用雲訊息佇列 RocketMQ 版的訊息過濾功能,可以協助消費者更高效地過濾自己需要的訊息集合,避免大量無效訊息投遞給消費者,降低下遊系統處理壓力。

雲訊息佇列 RocketMQ 版主要解決的單個業務域即同一個主題內不同訊息子集的過濾問題,一般是基於同一業務下更具體的分類進行過濾匹配。如果是需要對不同業務域的訊息進行拆分,建議使用不同主題處理不同業務域的訊息。

功能概述

訊息過濾定義

過濾的含義指的是將合格訊息投遞給消費者,而不是將匹配到的訊息過濾掉。

雲訊息佇列 RocketMQ 版的訊息過濾功能通過生產者和消費者對訊息的屬性、標籤進行定義,並在雲訊息佇列 RocketMQ 版服務端根據過濾條件進行篩選匹配,將合格訊息投遞給消費者進行消費。

訊息過濾原理

訊息過濾

訊息過濾主要通過以下幾個關鍵流程實現:

  • 生產者:生產者在初始化訊息時預先為訊息設定一些屬性和標籤,用於後續消費時指定過濾目標。

  • 消費者:消費者在初始化及後續消費流程中通過調用訂閱關係註冊介面,向服務端上報需要訂閱指定主題的哪些訊息,即過濾條件。

  • 服務端:消費者擷取訊息時會觸發服務端的動態過濾計算,雲訊息佇列 RocketMQ 版服務端根據消費者上報的過濾條件的運算式進行匹配,並將合格訊息投遞給消費者。

訊息過濾分類

雲訊息佇列 RocketMQ 版支援Tag標籤過濾和SQL屬性過濾,這兩種過濾方式對比如下:

對比項

Tag標籤過濾

SQL屬性過濾

過濾目標

訊息的Tag標籤。

訊息的屬性,包括使用者自訂屬性以及系統屬性(Tag是一種系統屬性)。

過濾能力

精準匹配。

SQL文法匹配。

適用情境

簡單過濾情境、計算邏輯簡單輕量。

複雜過濾情境、計算邏輯較複雜。

具體的使用方式及樣本,請參見下文的Tag標籤過濾SQL屬性過濾

訂閱關係一致性

過濾運算式屬於訂閱關係的一部分,雲訊息佇列 RocketMQ 版的領域模型規定,同一消費者分組內的多個消費者的訂閱關係包括過濾運算式,必須保持一致,否則可能會導致部分訊息消費不到。更多資訊,請參見訂閱關係(Subscription)

Tag標籤過濾

Tag標籤過濾方式是雲訊息佇列 RocketMQ 版提供的基礎訊息過濾能力,基於生產者為訊息設定的Tag標籤進行匹配。生產者在發送訊息時,設定訊息的Tag標籤,消費者需指定已有的Tag標籤來進行匹配訂閱。

情境樣本

以下圖電商交易情境為例,從客戶下單到收到商品這一過程會生產一系列訊息:

  • 訂單訊息

  • 支付訊息

  • 物流訊息

這些訊息會發送到名稱為Trade_Topic的Topic中,被各個不同的下遊系統所訂閱:

  • 支付系統:只需訂閱支付訊息。

  • 物流系統:只需訂閱物流訊息。

  • 交易成功率分析系統:需訂閱訂單和支付訊息。

  • Realtime Compute系統:需要訂閱所有和交易相關的訊息。

過濾效果如下圖所示:Tag過濾

Tag標籤設定

  • Tag由生產者發送訊息時設定,每條訊息允許設定一個Tag標籤。

  • Tag使用可見字元,建議長度不超過128字元。

  • Tag匹配時大小寫敏感,例如,TAG A和tag a屬於兩個標籤。

Tag標籤過濾規則

Tag標籤過濾為精準字串匹配,過濾規則設定格式如下:

  • 單Tag匹配:過濾運算式為目標Tag。表示只有訊息標籤為指定目標Tag的訊息符合匹配條件,會被發送給消費者。

  • 多Tag匹配:多個Tag之間為或的關係,不同Tag間使用兩個豎線(||)隔開。例如,Tag1||Tag2||Tag3,表示標籤為Tag1或Tag2或Tag3的訊息都滿足匹配條件,都會被發送給消費者進行消費。

  • 全部匹配:使用星號(*)作為全匹配運算式。表示主題下的所有訊息都將被發送給消費者進行消費。

使用樣本

  • 發送訊息,設定Tag標籤。

    Message message = messageBuilder.setTopic("topic")
                    //設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
                    .setKeys("messageKey")
                    //設定訊息Tag,用於消費端根據指定Tag過濾訊息。
                    //該樣本表示訊息的Tag設定為“TagA”。
                    .setTag("TagA")
                    //訊息體。
                    .setBody("messageBody".getBytes())
                    .build();
                        
  • 訂閱訊息,匹配單個Tag標籤。

    String topic = "Your Topic";
    //只訂閱訊息標籤為“TagA”的訊息。
    FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • 訂閱訊息,匹配多個Tag標籤。

    String topic = "Your Topic";
    //只訂閱訊息標籤為“TagA”、“TagB”或“TagC”的訊息。
    FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • 訂閱訊息,匹配Topic中的所有訊息,不進行過濾。

    String topic = "Your Topic";
    //使用Tag標籤過濾訊息,訂閱所有訊息。
    FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);

SQL屬性過濾

SQL屬性過濾是雲訊息佇列 RocketMQ 版提供的進階訊息過濾方式,通過生產者為訊息設定的屬性(Key)及屬性值(Value)進行匹配。生產者在發送訊息時可設定多個屬性,消費者訂閱時可設定SQL文法的過濾運算式過濾多個屬性。

說明

Tag是一種系統屬性,所以SQL過濾方式也相容Tag標籤過濾。在SQL文法中,Tag的屬性名稱為TAGS。

情境樣本

以下圖電商交易情境為例,從客戶下單到收到商品這一過程會生產一系列訊息,按照類型將訊息分為訂單訊息和物流訊息,其中給物流訊息定義地區屬性,按照地區分為杭州和上海:

  • 訂單訊息

  • 物流訊息

    • 物流訊息且地區為杭州

    • 物流訊息且地區為上海

這些訊息會發送到名稱為Trade_Topic的Topic中,被各個不同的系統所訂閱:

  • 物流系統1:只需訂閱物流訊息且訊息地區為杭州。

  • 物流系統2:只需訂閱物流訊息且訊息地區為杭州或上海。

  • 訂單跟蹤系統:只需訂閱訂單訊息。

  • Realtime Compute系統:需要訂閱所有和交易相關的訊息。

過濾效果如下圖所示:sql過濾

訊息屬性設定

  • 生產者發送訊息時可以設定訊息屬性,每個屬性都是一個自訂的索引值對(Key-Value)。

    • 屬性Key可以由字母(a-z, A-Z)、數字(0-9)、底線(_)組成。

    • 屬性Key的第一個字元必須是字母或底線,不能是數字。

  • 每條訊息支援設定多個屬性。

SQL屬性過濾規則

SQL屬性過濾使用SQL92文法作為過濾規則運算式,文法規範如下:

文法

說明

樣本

IS NULL

判斷屬性不存在。

a IS NULL :屬性a不存在。

IS NOT NULL

判斷屬性存在。

a IS NOT NULL:屬性a存在。

  • >

  • >=

  • <

  • <=

用於比較數字,不能用於比較字串,否則消費者用戶端啟動時會報錯。

說明
  • 可轉化為數位字串也被認為是數字。

  • 數位類型為int,取值範圍:-2147483648 ~ 2147483647。

  • a IS NOT NULL AND a > 100:屬性a存在且屬性a的值大於100。

  • a IS NOT NULL AND a > 'abc':錯誤樣本,'abc'為字串,不能用於比較大小。

BETWEEN xxx AND xxx

用於比較數字,不能用於比較字串,否則消費者用戶端啟動時會報錯。等價於>= xxx AND <= xxx。表示屬性值在兩個數字之間。

a IS NOT NULL AND (a BETWEEN 10 AND 100):屬性a存在且屬性a的值大於等於10且小於等於100。

NOT BETWEEN xxx AND xxx

用於比較數字,不能用於比較字串,否則消費者用戶端啟動會報錯。等價於< xxx OR > xxx,表示屬性值在兩個值的區間之外。

a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):屬性a存在且屬性a的值小於10或大於100。

IN (xxx, xxx)

表示屬性的值在某個集合內。集合的元素只能是字串。

a IS NOT NULL AND (a IN ('abc', 'def')):屬性a存在且屬性a的值為abc或def。

  • =

  • <>

等於和不等於。可用於比較數字和字串。

a IS NOT NULL AND (a = 'abc' OR a<>'def'):屬性a存在且屬性a的值為abc或a的值不為def。

  • AND

  • OR

邏輯與、邏輯或。可用於組合任意簡單的邏輯判斷,需要將每個邏輯判斷內容放入括弧內。

a IS NOT NULL AND (a > 100) OR (b IS NULL):屬性a存在且屬性a的值大於100或屬性b不存在。

由於SQL屬性過濾是生產者定義訊息屬性,消費者設定SQL過濾條件,因此過濾條件的計算結果具有不確定性,服務端的處理方式如下:

  • 異常情況處理:如果過濾條件的運算式計算拋出異常,訊息預設被過濾,不會被投遞給消費者。例如比較數字和非數字類型的值。

  • 空值情況處理:如果過濾條件的運算式計算值為null或不是布爾類型(true和false),則訊息預設被過濾,不會被投遞給消費者。例如發送訊息時未定義某個屬性,在訂閱時過濾條件中直接使用該屬性,則過濾條件的運算式計算結果為null。

  • 數實值型別不符處理:如果訊息自訂屬性為浮點型,但過濾條件中使用整數進行判斷,則訊息預設被過濾,不會被投遞給消費者。

使用樣本

  • 發送訊息,同時設定訊息Tag標籤和自訂屬性。

    Message message = messageBuilder.setTopic("topic")
                    //設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
                    .setKeys("messageKey")
                    //設定訊息Tag,用於消費端根據指定Tag過濾訊息。
                    //該樣本表示訊息的Tag設定為“messageTag”。
                    .setTag("messageTag")
                    //訊息也可以設定自訂的分類屬性,例如環境標籤、地區、邏輯分支。
                    //該樣本表示為訊息自訂一個屬性,該屬性為地區,屬性值為杭州。
                    .addProperty("Region", "Hangzhou")
                    //訊息體。
                    .setBody("messageBody".getBytes())
                    .build();
  • 訂閱訊息,根據單個自訂屬性匹配訊息。

    String topic = "topic";
    //只訂閱地區屬性為杭州的訊息。
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • 訂閱訊息,同時根據多個自訂屬性匹配訊息。

    String topic = "topic";
    //只訂閱地區屬性為杭州且價格屬性大於30的訊息。
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • 訂閱訊息,匹配Topic中的所有訊息,不進行過濾。

    String topic = "topic";
    //訂閱所有訊息。
    FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);

使用建議

合理劃分主題和Tag標籤

從訊息的過濾機制和主題的原理機制可以看出,業務訊息的拆分可以基於主題進行篩選,也可以基於主題內訊息的Tag標籤及屬性進行篩選。關於拆分方式的選擇,應遵循以下原則:

  • 訊息類型是否一致:不同類型的訊息,如順序訊息和普通訊息需要使用不同的主題進行拆分,無法通過Tag標籤進行分類。

  • 業務域是否相同:不同業務域和部門的訊息應該拆分不同的主題。例如物流訊息和支付訊息應該使用兩個不同的主題;同樣是一個主題內的物流訊息,普通物流訊息和加急物流訊息則可以通過不同的Tag進行區分。

  • 訊息量級和重要性是否一致:如果訊息的量級規模存在巨大差異,或者說訊息的鏈路重要程度存在差異,則應該使用不同的主題進行隔離拆分。

訊息過濾常見問題

多個消費者訂閱同一個Topic下的不同Tag,出現訊息丟失情況。

可能原因:若多個消費者是通過同一個消費者分組(Group ID)訂閱的指定Topic,則所有消費者的過濾條件即訂閱的Tag要一致,否則會出現訂閱關係不一致,導致部分訊息丟失。

消費者設定了過濾條件,訊息消費數量是按照過濾前還是過濾後計算呢?

按照過濾後的訊息數量計算。

消費者線上無消費訊息,但Group有堆積。

採用SQL/TAG消費過濾的方式,未被過濾條件命中的訊息會計算為訊息堆積,堆積量的計算如下所示。

  • SQL消費方式:堆積量 = 已就緒的訊息量 + 處理中的訊息量 - 未被SQL命中的訊息數量。

  • TAG消費方式:堆積量 =(已就緒的訊息量 + 處理中的訊息量)* TAG標籤訊息百分比。

說明

TAG標籤訊息百分比 = 採樣內TAG標籤訊息量 / 採樣訊息總量

相關文檔

完整的訊息收發範例程式碼,請參見SDK參考概述