本文介紹雲訊息佇列 RocketMQ 版中消費者(Consumer)的定義、模型關係、內部屬性、行為約束、版本相容性及使用建議。
定義
消費者是雲訊息佇列 RocketMQ 版中用來接收並處理訊息的運行實體。
消費者通常被整合在業務系統中,從雲訊息佇列 RocketMQ 版服務端擷取訊息,並將訊息轉化成業務可理解的資訊,供商務邏輯處理。
在訊息消費端,可以定義如下傳輸行為:
- 消費者身份:消費者必須關聯一個指定的消費者分組,以擷取分組內統一定義的行為配置和消費狀態。
- 消費者類型:雲訊息佇列 RocketMQ 版面向不同的開發情境提供了多樣的消費者類型,包括PushConsumer類型、SimpleConsumer類型等。具體資訊,請參見消費者分類。
- 消費者本地回合組態:消費者根據不同的消費者類型,控制消費者用戶端本地的回合組態。例如消費者用戶端的線程數,消費並發度等,實現不同的傳輸效果。
模型關係
在雲訊息佇列 RocketMQ 版的領域模型中,消費者的位置和流程如下:

- 訊息由生產者初始化並發送到雲訊息佇列 RocketMQ 版服務端。
- 訊息按照到達雲訊息佇列 RocketMQ 版服務端的順序儲存到主題的指定隊列中。
- 消費者按照指定的訂閱關係從雲訊息佇列 RocketMQ 版服務端中擷取訊息並消費。
內部屬性
消費者分組名稱
- 定義:當前消費者關聯的消費者分組名稱,消費者必須關聯到指定的消費者分組,通過消費者分組擷取消費行為。更多資訊,請參見消費者分組(ConsumerGroup)。
- 取值:消費者分組為雲訊息佇列 RocketMQ 版的邏輯資源,需要您提前通過控制台或OpenAPI建立。具體命名格式,請參見配額與限制。
用戶端ID
- 定義:消費者用戶端的標識,用於區分不同的消費者。叢集內全域唯一。
- 取值:用戶端ID由雲訊息佇列 RocketMQ 版的SDK自動產生,主要用於日誌查看、問題定位等營運情境,不支援修改。
通訊參數
- 存取點資訊(必選):串連服務端的接入地址,用於識別服務端叢集。
存取點必須按格式配置,建議使用網域名稱,避免使用IP地址,防止節點變更無法進行熱點遷移。
- 身份認證資訊(可選):用戶端用於身分識別驗證的憑證資訊。
僅在服務端開啟身份識別和認證時需要傳輸。
- 請求逾時時間(可選):用戶端網路請求調用的逾時時間。取值範圍和預設值,請參見參數限制。
預綁定訂閱關係列表
- 定義:指定消費者的訂閱關係列表。
雲訊息佇列 RocketMQ 版服務端可在消費者初始化階段,根據預綁定的訂閱關係列表對目標主題進行許可權及合法性校正,無需等到應用啟動後才能校正。
- 取值:建議在消費者初始化階段明確訂閱關係即要訂閱的主題列表,若未設定,或訂閱的主題動態變更,雲訊息佇列 RocketMQ 版會對目標主題進行動態補充校正。
消費監聽器
- 定義:雲訊息佇列 RocketMQ 版服務端將訊息推送給消費者後,消費者調用訊息消費邏輯的監聽器。
- 取值:由消費者用戶端本地配置。
- 約束:使用PushConsumer類型的消費者消費訊息時,消費者用戶端必須設定消費監聽器。消費者類型的具體資訊,請參見消費者分類。
行為約束
在雲訊息佇列 RocketMQ 版領域模型中,消費者的管理通過消費者分組實現,同一分組內的消費者共同分攤訊息進行消費。因此,為了保證分組內訊息的正常負載和消費,雲訊息佇列 RocketMQ 版要求同一分組下的所有消費者以下消費行為保持一致:
- 投遞順序
- 消費重試策略
版本相容性
如行為約束中所述,同一分組內所有消費者的投遞順序和消費重試策略需要保持一致。
- 雲訊息佇列 RocketMQ 版服務端5.x版本:上述消費者的消費行為從關聯的消費者分組中統一擷取,因此,同一分組內所有消費者的消費行為必然是一致的,用戶端無需關注。
- 雲訊息佇列 RocketMQ 版服務端3.x/4.x歷史版本:上述消費邏輯由消費者用戶端介面定義,因此,您需要自己在消費者用戶端設定時保證同一分組下的消費者的消費行為一致。
若您使用雲訊息佇列 RocketMQ 版服務端5.x版本,用戶端使用歷史版本SDK,則消費者的消費邏輯以消費者用戶端介面的設定為準。
使用建議
不建議在單一進程內建立大量消費者
雲訊息佇列 RocketMQ 版的消費者在通訊協定層面支援非阻塞傳輸模式,網路通訊效率較高,並且支援多線程並發訪問。因此,大部分情境下,單一進程內同一個消費分組只需要初始化唯一的一個消費者即可,開發過程中應避免以相同的配置初始化多個消費者。
不建議頻繁建立和銷毀消費者
雲訊息佇列 RocketMQ 版的消費者是可以重複利用的底層資源,類似資料庫的串連池。因此不需要在每次接收訊息時動態建立消費者,且在消費完成後銷毀消費者。這樣頻繁地建立銷毀會在服務端產生大量短串連請求,嚴重影響系統效能。
- 正確樣本
Consumer c = ConsumerBuilder.build(); for (int i =0;i<n;i++) { Message m= c.receive(); //process message } c.shutdown(); - 典型錯誤樣本
for (int i =0;i<n;i++) { Consumer c = ConsumerBuilder.build(); Message m= c.receive(); //process message c.shutdown(); }