本文介紹雲訊息佇列 RocketMQ 版中輕量主題(LiteTopic)的定義、模型關係、內部屬性、行為約束、版本相容性及使用建議。
前提條件
定義
輕量主題是雲訊息佇列 RocketMQ 版中訊息傳輸和儲存的二級容器,用於標識同一類商務邏輯下不同子類(例如不同會話、任務等粒度)的訊息。
輕量主題的作用主要如下:
實現排他性消費,定義資料的二級隔離
建議將不同子類的資料拆分到不同的輕量主題中管理,通過輕量主題實現更細緻的儲存隔離性和訂閱隔離性。
定義資料的身份和許可權
在基於主題的身份識別和許可權管理基礎上,可以通過輕量主題,進一步細分使用者的身份與許可權。
模型關係
在整個雲訊息佇列 RocketMQ 版的領域模型中,輕量主題所處的流程和位置如下:

主題(Topic)是雲訊息佇列 RocketMQ 版中訊息傳輸和儲存的頂層容器。當類型為Lite類型時,Topic下可建立輕量主題(LiteTopic),由Topic和LiteTopic共同唯一確認訊息的儲存容器。
當類型為Lite類型時,每個儲存容器預設由一個隊列組成。
內部屬性
輕量主題名稱
定義:輕量主題的名稱,用於標識輕量主題,輕量主題名稱在所屬主題內全域唯一。
取值:當主題類型為Lite時,使用者對message進行了setLiteTopic,如對應的輕量主題不存在,系統會自動建立。
約束:請參見參數限制。
到期時間
定義:輕量主題的到期時間,當輕量主題距離最近一次訊息寫入時間超過到期時間後,該輕量主題會被自動刪除。刪除是指釋放輕量主題佔用的個數,即佔用總數-1。
取值:當建立主題類型為Lite時,可以設定到期時間 expiration 值。
約束:請參見參數限制。
版本相容性
服務端版本:5.0-rmq-20251024-1 版本及以上
用戶端版本:RocketMQ gRPC 5.1.0 版本及以上
輕量類型和普通類型主題的差異
情境 | 對比項 | 輕量類型主題 | 普通類型主題 |
訊息儲存 | 一級主題 | 相同。都需要預先建立佈景主題資源。 | |
二級主題 | 可以在Topic下建立百萬量級的二級佈景主題資源LiteTopic,該二級資源有許多新特性。 | 無二級佈景主題資源。 | |
自動化生命週期管理 | 二級主題LiteTopic的生命週期可以自動化管理:
| 無 | |
順序性 | 每個LiteTopic只建立一個隊列,同一個隊列中的訊息儲存是順序的。
| 會建立多個隊列,只有分區順序Topic | |
收發並發TPS上限 | 由於只有一個隊列,每個LiteTopic 的TPS上限是有限的。 但Topic下可以建立百萬量級的LiteTopic,總TPS的上限是可以根據LiteTopic的增加而增加。 | Topic的TPS可以根據隊列數量和叢集機器節點數量橫向擴容。 | |
訊息消費 | 訂閱關係一致性 | 可以不一致。 同一Group下,每個消費者可訂閱不同的LiteTopic集合,Group的限制弱化。 | 需要一致。 同一Group下,每個消費者的訂閱關係需要保持一致,共用目標主題下的訊息。 |
順序性 | 順序消費,一個LiteTopic下的訊息只能被一個消費者線程處理。 | 可選擇並發消費或順序消費。 | |
動態訂閱 | 每個消費者可以動態增加或刪除指定某個LiteTopic的訂閱 | 無 | |
單個消費者可以訂閱的LiteTopic的數量 | 每個消費者可以訂閱千量級的LiteTopic | 無 | |
可觀測 | Metrics指標資料 | 有訊息堆積量指標 無訊息處理延隔時間指標 | 有訊息堆積量指標 有訊息處理之後時間指標 |
訊息軌跡 | 相同 | ||
常見輕量主題應用情境
應用情境 1:Multi-Agent 的非同步通訊,解決長耗時調用阻塞痛點
隨著AI需求情境變得更加複雜,大多數單Agent在複雜情境中面臨著局限性:缺乏專業化分工、難以對多領域進行整合;無法實現動態協作決策。單Agent應用和單Agent工作流程會逐步轉向 Multi-Agent 應用。但因為 AI 任務長耗時的特點,同步調用會造成調用者的線程阻塞,存在大規模協作擴充性問題。

如上圖所示 Multi-Agent的工作流程為:Supervisor Agent 負責將需求拆分給兩個子Agent;兩個子Agent負責各自領域問題解答並將結果返回給Supervisor Agent; Supervisor Agent將結果匯總返回給Web端。使用 RocketMQ 非同步通訊方案流程如下:
在接收請求的流程中
為每個子Agent建立一個Topic (Request)用於請求任務的緩衝隊列,可以是優先順序Topic,能先處理高優任務。
Supervisor Agent 將分割任務資訊發送到對應的請求主題中。
在返迴響應結果流程中
Supervisor Agent 建立 Lite 類型的Topic (Response) 並訂閱這個Topic。
子 Agent 處理將每個任務的響應結果發送到Topic (Response) 的LiteTopic中。LiteTopic可以用任務ID命名,為每個任務建立一個專屬的LiteTopic。
Supervisor Agent 通過訂閱即時擷取結果,然後通過HTTP SSE協議推送給Web端。
應用情境 2:分布式工作階段狀態管理,終結 AI 應用的工作階段狀態管理難題
AI 應用的互動模式具有特殊性,即長耗時、多輪次且高度依賴高成本計算的會話。當應用依賴 SSE 等長串連時,一旦串連中斷(如網關重啟、連線逾時、網路不穩定觸發),不僅會導致當前會話內容相關的丟失,更會直接造成已投入的 AI 任務作廢,從而浪費寶貴的算力資源。

如上圖所示,和情境 1的返迴響應結果流程相同,使用Lite 類型的Topic作為即時通知響應結果。這裡每個LiteTopic可以使用 SessionID 命名(例如 chatbot/{sessionID}),這樣會話結果都作為訊息在這個主題中有序傳遞。長串連重連後繼續保持會話的連續性的解決方案如下:
Web端2 和應用服務節點 1 建立長連結,建立會話Session2。
應用服務節點 1 監聽 LiteTopic [chat/SessionID2]
大模型任務調度組件,根據請求的SessionID資訊,將返回結果發送到 LiteTopic [chat/SessionID2]
由於網路等異常,WebSocket 自動重連到了應用服務節點2。
應用服務節點 1 取消訂閱LiteTopic [chat/SessionID2],應用服務節點 2 訂閱LiteTopic [chat/SessionID2]。
LiteTopic [chat/SessionID2] 根據之前的消費進度,將後續未消費的訊息繼續推送給應用服務節點 2。保證了工作階段狀態和會話資料的連續性。
範例程式碼
完整樣本請查看RocketMQ 5.x gRPC SDK中的範例程式碼。
發送訊息
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
final Message message = provider.newMessageBuilder()
.setTopic(topic)
//設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
.setKeys("messageKey")
//設定LiteTopic
.setLiteTopic("lite-topic-1")
//訊息體
.setBody("messageBody".getBytes())
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (LiteTopicQuotaExceededException e) {
// LiteTopic配額超限,評估並增加配額
log.error("Lite topic quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to send message", t);
}
消費訊息
需要使用 LitePushConsumer 消費類:
//初始化LitePushConsumer,需要綁定消費者分組ConsumerGroup、目標主題Topic、通訊參數等。
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 控制台建立ConsumerGroup時綁定的Topic
.bindTopic(topicName)
//設定消費者分組
.setConsumerGroup(consumerGroup)
.setMessageListener(messageView -> {
//處理訊息並返回消費結果。
LOGGER.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
try {
// 訂閱感興趣的LiteTopic集合
litePushConsumer.subscribeLite("lite-topic-1");
litePushConsumer.subscribeLite("lite-topic-2");
litePushConsumer.subscribeLite("lite-topic-3");
} catch (LiteSubscriptionQuotaExceededException e) {
// LiteTopic配額超限,評估並增加配額
log.error("Lite subscription quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to subscribe lite topic", t);
}
// 業務處理完畢後,及時取消不再使用的LiteTopic訂閱
litePushConsumer.unsubscribeLite("lite-topic-3");
// 擷取當前訂閱的LiteTopic集合
Set<String> liteTopicSet = litePushConsumer.getLiteTopicSet();
動態修改訂閱關係
/**
* 動態添加訂閱關係
* subscribeLite() 方法會發起網路請求並執行配額驗證,因此可能會調用失敗。
* 請務必檢查此調用的結果,以確保訂閱被成功添加。
* 可能的失敗情境包括:
* 1. 網路請求錯誤,可以重試。
* 2. 配額驗證失敗,拋出 LiteSubscriptionQuotaExceededException 異常。
* 請評估配額是否滿足需求,並及時使用 unsubscribeLite() 取消訂閱不再使用的主題以釋放資源。
*/
litePushConsumer.subscribeLite("lite-topic-1");
//動態刪除訂閱關係
litePushConsumer.unsubscribeLite("lite-topic-1");
使用限制
單個消費者能訂閱的LiteTopic數量上限為2000個(可提工單調整)。
單個LiteTopic的消費TPS為上限200TPS。
為保障服務穩定性,單個執行個體對可建立或可訂閱的 LiteTopic 數量設定了上限。具體配額(可提工單調整)請參見下表。
LiteTopic 建立數量
定義:指單個執行個體在其生命週期內,當前已建立並存在的 LiteTopic 總量。
觸發條件與影響:當此數量達到上限時,若用戶端嘗試向一個尚未存在的 LiteTopic 發送訊息(該操作會觸發自動建立),系統將無法建立該 LiteTopic,並返回訊息發送失敗的錯誤。
LiteTopic 訂閱數量
定義:指執行個體下所有線上的消費者用戶端與 LiteTopic 建立的有效訂閱關係的總和。這是一個動態變化的數值。
影響:當此數量達到上限時,任何消費者用戶端嘗試訂閱一個新的 LiteTopic 都會失敗,無法建立新的消費關係。
特殊規則:請注意,即使一個 LiteTopic 已從系統中刪除,但若仍有消費者用戶端保持對其的訂閱關係,該訂閱關係依然會被計入訂閱總數,直至消費者停止訂閱。
Serverless系列執行個體
部署架構 | 容量模式 | 規格 | 可建立或訂閱LiteTopic的最大數量 |
獨享 | 預留+彈性 | 5000 | 30萬 |
10000 | 60萬 | ||
15000 | 72萬 | ||
[2萬, 5萬] | 100萬 | ||
(5萬, 10萬] | 150萬 | ||
(10萬, 20萬] | 240萬 | ||
(20萬, 30萬] | 470萬 | ||
(30萬, 50萬] | 630萬 | ||
(50萬, 100萬] | 1160萬 |
非Serverless系列(訂用帳戶、隨用隨付)執行個體
標準版
執行個體規格 | 訊息收發基礎規格TPS上限(次/秒) | 可建立或訂閱LiteTopic的最大數量 |
rmq.s2.2xlarge | 2000 | 15萬 |
rmq.s2.4xlarge | 4000 | 25萬 |
rmq.s2.6xlarge | 6000 | 30萬 |
專業版
執行個體規格 | 訊息收發基礎規格TPS上限(次/秒) | 可建立或訂閱LiteTopic的最大數量 |
rmq.p2.2xlarge | 2000 | 15萬 |
rmq.p2.4xlarge | 4000 | 25萬 |
rmq.p2.6xlarge | 6000 | 30萬 |
rmq.p2.10xlarge | 10000 | 60萬 |
rmq.p2.20xlarge | 20000 | 80萬 |
rmq.p2.30xlarge | 30000 | 100萬 |
rmq.p2.40xlarge | 40000 | 120w |
rmq.p2.50xlarge | 50000 | 140w |
rmq.p2.100xlarge | 100000 | 220w |
rmq.p2.120xlarge | 120000 | 270w |
rmq.p2.150xlarge | 150000 | 330w |
rmq.p2.200xlarge | 200000 | 450w |
鉑金版
執行個體規格 | 訊息收發基礎規格TPS上限(次/秒) | 可建立或訂閱LiteTopic的最大數量 |
rmq.u2.10xlarge | 10000 | 60萬 |
rmq.u2.20xlarge | 20000 | 80萬 |
rmq.u2.30xlarge | 30000 | 100萬 |
rmq.u2.40xlarge | 40000 | 120萬 |
rmq.u2.50xlarge | 50000 | 140萬 |
rmq.u2.60xlarge | 60000 | 160萬 |
rmq.u2.70xlarge | 70000 | 170萬 |
rmq.u2.80xlarge | 80000 | 180萬 |
rmq.u2.90xlarge | 90000 | 200萬 |
rmq.u2.100xlarge | 100000 | 220萬 |
rmq.u2.120xlarge | 120000 | 270萬 |
rmq.u2.150xlarge | 150000 | 330萬 |
rmq.u2.200xlarge | 200000 | 450萬 |
rmq.u2.250xlarge | 250000 | 560萬 |
rmq.u2.300xlarge | 300000 | 630萬 |
rmq.u2.350xlarge | 350000 | 750萬 |
rmq.u2.400xlarge | 400000 | 930萬 |
rmq.u2.450xlarge | 450000 | 1040萬 |
rmq.u2.500xlarge | 500000 | 1160萬 |
rmq.u2.550xlarge | 550000 | 1280萬 |
rmq.u2.600xlarge | 600000 | 1400萬 |
rmq.u2.1000xlarge | 1000000 | 2320萬 |
