全部產品
Search
文件中心

ApsaraMQ for RocketMQ:輕量主題模型

更新時間:Dec 19, 2025

本文介紹雲訊息佇列 RocketMQ 版中輕量主題(LiteTopic)的定義、模型關係、內部屬性、行為約束、版本相容性及使用建議。

前提條件

  • 輕量主題目前只有非Serverless系列(訂用帳戶、隨用隨付)和Serverless系列的獨享執行個體支援。

  • 如何購買支援輕量主題模型的版本執行個體:

    • 新購執行個體在購買頁面添加產品能力標籤,標籤鍵:version_capability;標籤值:lite-topic。如下圖所示:

      image

    • 存量執行個體可提交工單升級到支援輕量主題模型的版本。提工單時需要提供執行個體 ID 和執行個體所屬地區。

  • 使用者可提交工單免費諮詢LiteTopic在相關情境的解決方案。

定義

輕量主題是雲訊息佇列 RocketMQ 版中訊息傳輸和儲存的二級容器,用於標識同一類商務邏輯下不同子類(例如不同會話、任務等粒度)的訊息。

輕量主題的作用主要如下:

  • 實現排他性消費,定義資料的二級隔離

建議將不同子類的資料拆分到不同的輕量主題中管理,通過輕量主題實現更細緻的儲存隔離性和訂閱隔離性。

  • 定義資料的身份和許可權

在基於主題的身份識別和許可權管理基礎上,可以通過輕量主題,進一步細分使用者的身份與許可權。

模型關係

在整個雲訊息佇列 RocketMQ 版的領域模型中,輕量主題所處的流程和位置如下:

image.png

  • 主題(Topic)是雲訊息佇列 RocketMQ 版中訊息傳輸和儲存的頂層容器。當類型為Lite類型時,Topic下可建立輕量主題(LiteTopic),由Topic和LiteTopic共同唯一確認訊息的儲存容器。

  • 當類型為Lite類型時,每個儲存容器預設由一個隊列組成。

內部屬性

輕量主題名稱

  • 定義:輕量主題的名稱,用於標識輕量主題,輕量主題名稱在所屬主題內全域唯一。

  • 取值:當主題類型為Lite時,使用者對message進行了setLiteTopic,如對應的輕量主題不存在,系統會自動建立。

  • 約束:請參見參數限制

到期時間

  • 定義:輕量主題的到期時間,當輕量主題距離最近一次訊息寫入時間超過到期時間後,該輕量主題會被自動刪除。刪除是指釋放輕量主題佔用的個數,即佔用總數-1。

  • 取值:當建立主題類型為Lite時,可以設定到期時間 expiration 值。

  • 約束:請參見參數限制

版本相容性

  • 服務端版本:5.0-rmq-20251024-1 版本及以上

  • 用戶端版本:RocketMQ gRPC 5.1.0 版本及以上

輕量類型和普通類型主題的差異

情境

對比項

輕量類型主題

普通類型主題

訊息儲存

一級主題

相同。都需要預先建立佈景主題資源。

二級主題

可以在Topic下建立百萬量級的二級佈景主題資源LiteTopic,該二級資源有許多新特性。

無二級佈景主題資源。

自動化生命週期管理

二級主題LiteTopic的生命週期可以自動化管理:

  • 自動建立:發送或訂閱的時候,如果二級主題LiteTopic不存在,則自動建立該資源。

  • 自動刪除:設定expiration時間,持續無新訊息發送後會自動刪除。

順序性

每個LiteTopic只建立一個隊列,同一個隊列中的訊息儲存是順序的。

  • 每個LiteTopic

會建立多個隊列,只有分區順序Topic

收發並發TPS上限

由於只有一個隊列,每個LiteTopic 的TPS上限是有限的。

但Topic下可以建立百萬量級的LiteTopic,總TPS的上限是可以根據LiteTopic的增加而增加。

Topic的TPS可以根據隊列數量和叢集機器節點數量橫向擴容。

訊息消費

訂閱關係一致性

可以不一致。

同一Group下,每個消費者可訂閱不同的LiteTopic集合,Group的限制弱化。

需要一致。

同一Group下,每個消費者的訂閱關係需要保持一致,共用目標主題下的訊息。

順序性

順序消費,一個LiteTopic下的訊息只能被一個消費者線程處理。

可選擇並發消費或順序消費。

動態訂閱

每個消費者可以動態增加或刪除指定某個LiteTopic的訂閱

單個消費者可以訂閱的LiteTopic的數量

每個消費者可以訂閱千量級的LiteTopic

可觀測

Metrics指標資料

有訊息堆積量指標

無訊息處理延隔時間指標

有訊息堆積量指標

有訊息處理之後時間指標

訊息軌跡

相同

常見輕量主題應用情境

應用情境 1:Multi-Agent 的非同步通訊,解決長耗時調用阻塞痛點

隨著AI需求情境變得更加複雜,大多數單Agent在複雜情境中面臨著局限性:缺乏專業化分工、難以對多領域進行整合;無法實現動態協作決策。單Agent應用和單Agent工作流程會逐步轉向 Multi-Agent 應用。但因為 AI 任務長耗時的特點,同步調用會造成調用者的線程阻塞,存在大規模協作擴充性問題。

image.png

如上圖所示 Multi-Agent的工作流程為:Supervisor Agent 負責將需求拆分給兩個子Agent;兩個子Agent負責各自領域問題解答並將結果返回給Supervisor Agent; Supervisor Agent將結果匯總返回給Web端。使用 RocketMQ 非同步通訊方案流程如下:

  1. 在接收請求的流程中

    1. 為每個子Agent建立一個Topic (Request)用於請求任務的緩衝隊列,可以是優先順序Topic,能先處理高優任務。

    2. Supervisor Agent 將分割任務資訊發送到對應的請求主題中。

  2. 在返迴響應結果流程中

    1. Supervisor Agent 建立 Lite 類型的Topic (Response) 並訂閱這個Topic。

    2. 子 Agent 處理將每個任務的響應結果發送到Topic (Response) 的LiteTopic中。LiteTopic可以用任務ID命名,為每個任務建立一個專屬的LiteTopic。

    3. Supervisor Agent 通過訂閱即時擷取結果,然後通過HTTP SSE協議推送給Web端。

應用情境 2:分布式工作階段狀態管理,終結 AI 應用的工作階段狀態管理難題

AI 應用的互動模式具有特殊性,即長耗時、多輪次且高度依賴高成本計算的會話。當應用依賴 SSE 等長串連時,一旦串連中斷(如網關重啟、連線逾時、網路不穩定觸發),不僅會導致當前會話內容相關的丟失,更會直接造成已投入的 AI 任務作廢,從而浪費寶貴的算力資源。

image.png

如上圖所示,和情境 1的返迴響應結果流程相同,使用Lite 類型的Topic作為即時通知響應結果。這裡每個LiteTopic可以使用 SessionID 命名(例如 chatbot/{sessionID}),這樣會話結果都作為訊息在這個主題中有序傳遞。長串連重連後繼續保持會話的連續性的解決方案如下:

  1. Web端2 和應用服務節點 1 建立長連結,建立會話Session2。

  2. 應用服務節點 1 監聽 LiteTopic [chat/SessionID2]

  3. 大模型任務調度組件,根據請求的SessionID資訊,將返回結果發送到 LiteTopic [chat/SessionID2]

  4. 由於網路等異常,WebSocket 自動重連到了應用服務節點2。

  5. 應用服務節點 1 取消訂閱LiteTopic [chat/SessionID2],應用服務節點 2 訂閱LiteTopic [chat/SessionID2]。

  6. LiteTopic [chat/SessionID2] 根據之前的消費進度,將後續未消費的訊息繼續推送給應用服務節點 2。保證了工作階段狀態和會話資料的連續性。

範例程式碼

完整樣本請查看RocketMQ 5.x gRPC SDK中的範例程式碼。

發送訊息

Producer producer = provider.newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(clientConfiguration)
    .build();

final Message message = provider.newMessageBuilder()
    .setTopic(topic)
    //設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
    .setKeys("messageKey")
    //設定LiteTopic
    .setLiteTopic("lite-topic-1")
    //訊息體
    .setBody("messageBody".getBytes())
    .build();

try {
    final SendReceipt sendReceipt = producer.send(message);
    log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (LiteTopicQuotaExceededException e) {
    // LiteTopic配額超限,評估並增加配額
    log.error("Lite topic quota exceeded", e);
} catch (Throwable t) {
    log.error("Failed to send message", t);
}

消費訊息

需要使用 LitePushConsumer 消費類:

//初始化LitePushConsumer,需要綁定消費者分組ConsumerGroup、目標主題Topic、通訊參數等。
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 控制台建立ConsumerGroup時綁定的Topic
    .bindTopic(topicName)
    //設定消費者分組
    .setConsumerGroup(consumerGroup)
    .setMessageListener(messageView -> {
        //處理訊息並返回消費結果。
        LOGGER.info("Consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();

try {
    // 訂閱感興趣的LiteTopic集合
    litePushConsumer.subscribeLite("lite-topic-1");
    litePushConsumer.subscribeLite("lite-topic-2");
    litePushConsumer.subscribeLite("lite-topic-3");
} catch (LiteSubscriptionQuotaExceededException e) {
    // LiteTopic配額超限,評估並增加配額
    log.error("Lite subscription quota exceeded", e);
} catch (Throwable t) {
    log.error("Failed to subscribe lite topic", t);
}

// 業務處理完畢後,及時取消不再使用的LiteTopic訂閱
litePushConsumer.unsubscribeLite("lite-topic-3");

// 擷取當前訂閱的LiteTopic集合
Set<String> liteTopicSet = litePushConsumer.getLiteTopicSet();

動態修改訂閱關係

/**
 * 動態添加訂閱關係
 * subscribeLite() 方法會發起網路請求並執行配額驗證,因此可能會調用失敗。
 * 請務必檢查此調用的結果,以確保訂閱被成功添加。
 * 可能的失敗情境包括:
 * 1. 網路請求錯誤,可以重試。
 * 2. 配額驗證失敗,拋出 LiteSubscriptionQuotaExceededException 異常。
 * 請評估配額是否滿足需求,並及時使用 unsubscribeLite() 取消訂閱不再使用的主題以釋放資源。
 */

litePushConsumer.subscribeLite("lite-topic-1");

//動態刪除訂閱關係
litePushConsumer.unsubscribeLite("lite-topic-1");

使用限制

  1. 單個消費者能訂閱的LiteTopic數量上限為2000個(可提工單調整)。

  2. 單個LiteTopic的消費TPS為上限200TPS。

  3. 為保障服務穩定性,單個執行個體對可建立或可訂閱的 LiteTopic 數量設定了上限。具體配額(可提工單調整)請參見下表。

    1. LiteTopic 建立數量

      • 定義:指單個執行個體在其生命週期內,當前已建立並存在的 LiteTopic 總量。

      • 觸發條件與影響:當此數量達到上限時,若用戶端嘗試向一個尚未存在的 LiteTopic 發送訊息(該操作會觸發自動建立),系統將無法建立該 LiteTopic,並返回訊息發送失敗的錯誤。

    2. LiteTopic 訂閱數量

      • 定義:指執行個體下所有線上的消費者用戶端與 LiteTopic 建立的有效訂閱關係的總和。這是一個動態變化的數值。

      • 影響:當此數量達到上限時,任何消費者用戶端嘗試訂閱一個新的 LiteTopic 都會失敗,無法建立新的消費關係。

      • 特殊規則:請注意,即使一個 LiteTopic 已從系統中刪除,但若仍有消費者用戶端保持對其的訂閱關係,該訂閱關係依然會被計入訂閱總數,直至消費者停止訂閱。

Serverless系列執行個體

部署架構

容量模式

規格

可建立或訂閱LiteTopic的最大數量

獨享

預留+彈性

5000

30萬

10000

60萬

15000

72萬

[2萬, 5萬]

100萬

(5萬, 10萬]

150萬

(10萬, 20萬]

240萬

(20萬, 30萬]

470萬

(30萬, 50萬]

630萬

(50萬, 100萬]

1160萬

非Serverless系列(訂用帳戶、隨用隨付)執行個體

標準版

執行個體規格

訊息收發基礎規格TPS上限(次/秒)

可建立或訂閱LiteTopic的最大數量

rmq.s2.2xlarge

2000

15萬

rmq.s2.4xlarge

4000

25萬

rmq.s2.6xlarge

6000

30萬

專業版

執行個體規格

訊息收發基礎規格TPS上限(次/秒)

可建立或訂閱LiteTopic的最大數量

rmq.p2.2xlarge

2000

15萬

rmq.p2.4xlarge

4000

25萬

rmq.p2.6xlarge

6000

30萬

rmq.p2.10xlarge

10000

60萬

rmq.p2.20xlarge

20000

80萬

rmq.p2.30xlarge

30000

100萬

rmq.p2.40xlarge

40000

120w

rmq.p2.50xlarge

50000

140w

rmq.p2.100xlarge

100000

220w

rmq.p2.120xlarge

120000

270w

rmq.p2.150xlarge

150000

330w

rmq.p2.200xlarge

200000

450w

鉑金版

執行個體規格

訊息收發基礎規格TPS上限(次/秒)

可建立或訂閱LiteTopic的最大數量

rmq.u2.10xlarge

10000

60萬

rmq.u2.20xlarge

20000

80萬

rmq.u2.30xlarge

30000

100萬

rmq.u2.40xlarge

40000

120萬

rmq.u2.50xlarge

50000

140萬

rmq.u2.60xlarge

60000

160萬

rmq.u2.70xlarge

70000

170萬

rmq.u2.80xlarge

80000

180萬

rmq.u2.90xlarge

90000

200萬

rmq.u2.100xlarge

100000

220萬

rmq.u2.120xlarge

120000

270萬

rmq.u2.150xlarge

150000

330萬

rmq.u2.200xlarge

200000

450萬

rmq.u2.250xlarge

250000

560萬

rmq.u2.300xlarge

300000

630萬

rmq.u2.350xlarge

350000

750萬

rmq.u2.400xlarge

400000

930萬

rmq.u2.450xlarge

450000

1040萬

rmq.u2.500xlarge

500000

1160萬

rmq.u2.550xlarge

550000

1280萬

rmq.u2.600xlarge

600000

1400萬

rmq.u2.1000xlarge

1000000

2320萬