全部產品
Search
文件中心

ApsaraMQ for RocketMQ:生產者(Producer)

更新時間:Jun 30, 2024

本文介紹雲訊息佇列 RocketMQ 版中生產者(Producer)的定義、模型關係、內部屬性、版本相容性及使用建議。

定義

生產者是雲訊息佇列 RocketMQ 版系統中用來構建並傳輸訊息到服務端的運行實體。

生產者通常被整合在業務系統中,將業務訊息按照要求封裝成雲訊息佇列 RocketMQ 版訊息(Message)並發送至服務端。

在訊息生產者中,可以定義如下傳輸行為:

  • 發送方式:生產者可通過API介面設定訊息發送的方式。雲訊息佇列 RocketMQ 版支援同步傳輸和非同步傳輸。更多資訊,請參見訊息傳輸模式

  • 事務行為:雲訊息佇列 RocketMQ 版支援事務訊息,對於事務訊息需要生產者配合進行事務檢查等行為保障事務的最終一致性。具體資訊,請參見事務訊息

生產者和主題的關係為多對多關係,即同一個生產者可以向多個主題發送訊息,對於平台類情境如果需要發送訊息到多個主題,並不需要建立多個生產者;同一個主題也可以接收多個生產者的訊息,以此可以實現生產者效能的水平擴充和容災。

生產者主題關聯

模型關係

雲訊息佇列 RocketMQ 版的領域模型中,生產者的位置和流程如下:生產者

  1. 訊息由生產者初始化並發送到雲訊息佇列 RocketMQ 版服務端。
  2. 訊息按照到達雲訊息佇列 RocketMQ 版服務端的順序儲存到主題的指定隊列中。
  3. 消費者按照指定的訂閱關係從雲訊息佇列 RocketMQ 版服務端中擷取訊息並消費。

內部屬性

用戶端ID

  • 定義:生產者用戶端的標識,用於區分不同的生產者。叢集內全域唯一。

  • 取值:用戶端ID由雲訊息佇列 RocketMQ 版的SDK自動產生,主要用於日誌查看、問題定位等營運情境,不支援修改。

通訊參數
  • 存取點資訊(必選):串連服務端的接入地址,用於識別服務端叢集。

    存取點必須按格式配置,建議使用網域名稱,避免使用IP地址,防止節點變更無法進行熱點遷移。

  • 身份認證資訊(可選):用戶端用於身分識別驗證的憑證資訊。

    僅在服務端開啟身份識別和認證時需要傳輸。

  • 請求逾時時間(可選):用戶端網路請求調用的逾時時間。取值範圍和預設值,請參見參數限制

預綁定主題列表

  • 定義:雲訊息佇列 RocketMQ 版的生產者需要將訊息發送到的目標主題列表,主要作用如下:

    • 事務訊息(必須設定):事務訊息情境下,生產者在故障、重啟恢複時,需要檢查事務訊息的主題中是否有未提交的事務訊息。避免生產者發送新訊息後,主題中的舊事務訊息一直處於未提交狀態,造成業務延遲。

    • 非事務訊息(建議設定):服務端會在生產者初始化時根據預綁定主題列表,檢查目標主題的存取權限和合法性,而不需要等到應用啟動後再檢查。

      若未設定,或後續訊息發送的目標主題動態變更,雲訊息佇列 RocketMQ 版會對目標主題進行動態補充檢驗。

  • 約束:對於事務訊息,預繫結資料行表必須設定,且需要和事務檢查器一起配合使用。

事務檢查器

  • 定義:雲訊息佇列 RocketMQ 版的事務訊息機制中,為保證異常情境下事務的最終一致性,生產者需要主動實現事務檢查器的介面。具體資訊,請參見事務訊息

  • 發送事務訊息時,事務檢查器必須設定,且需要和預綁定主題列表一起配合使用。

發送重試策略定義:生產者在訊息發送失敗時的重試策略。具體資訊,請參見訊息發送重試機制

版本相容性

雲訊息佇列 RocketMQ 版服務端5.x版本開始,生產者是匿名的,無需管理生產者分組(ProducerGroup);對於歷史版本服務端3.x和4.x版本,已經使用的生產者分組可以廢棄無需再設定,且不會對當前業務產生影響。

使用建議

不建議單一進程建立大量生產者

雲訊息佇列 RocketMQ 版的生產者和主題是多對多的關係,支援同一個生產者向多個主題發送訊息。對於生產者的建立和初始化,建議遵循夠用即可、最大化複用原則,如果有需要發送訊息到多個主題的情境,無需為每個主題都建立一個生產者。

不建議頻繁建立和銷毀生產者

雲訊息佇列 RocketMQ 版的生產者是可以重複利用的底層資源,類似資料庫的串連池。因此不需要在每次發送訊息時動態建立生產者,且在發送結束後銷毀生產者。這樣頻繁的建立銷毀會在服務端產生大量短串連請求,嚴重影響系統效能。

  • 正確樣本

    Producer p = ProducerBuilder.build();
      for (int i =0;i<n;i++)
        {
          Message m= MessageBuilder.build();
          p.send(m);
        }
    p.shutdown();
  • 典型錯誤樣本

    for (int i =0;i<n;i++)
      {
        Producer p = ProducerBuilder.build();
        Message m= MessageBuilder.build();
        p.send(m);
        p.shutdown();
      }