雲訊息佇列 RocketMQ 版通過消費位點管理消費進度,本文為您介紹雲訊息佇列 RocketMQ 版的消費進度管理機制。
背景資訊
雲訊息佇列 RocketMQ 版的生產者和消費者在進行訊息收發時,必然會涉及以下情境,訊息先生產後訂閱或先訂閱後生產。這兩種情境下,消費者用戶端啟動後從哪裡開始消費?如何標記已消費的訊息?這些都是由雲訊息佇列 RocketMQ 版的消費進度管理機制來定義的。
通過瞭解雲訊息佇列 RocketMQ 版的消費進度管理機制,可以協助您解答以下問題:
消費者啟動後從哪裡開始消費訊息?
消費者每次消費成功後如何標記訊息狀態,確保下次不會再重複處理該訊息?
某訊息被指定消費者消費過一次後,如果業務出現異常需要做故障恢複,該訊息能否被重新消費?
消費進度原理
訊息位點(Offset)
參考雲訊息佇列 RocketMQ 版主題和隊列的定義,訊息是按到達服務端的先後順序儲存在指定主題的多個隊列中,每條訊息在隊列中都有一個唯一的Long類型座標,這個座標被定義為訊息位點。
任意一個訊息佇列在邏輯上都是無限儲存,即訊息位點會從0到Long.MAX無限增加。通過主題、隊列和位點就可以定位任意一條訊息的位置,具體關係如下圖所示:
雲訊息佇列 RocketMQ 版定義隊列中最早一條訊息的位點為最小訊息位點(MinOffset);最新一條訊息的位點為最大訊息位點(MaxOffset)。雖然訊息佇列邏輯上是無限儲存,但由於服務端物理節點的儲存空間有限,雲訊息佇列 RocketMQ 版會滾動刪除隊列中儲存最早的訊息。因此,訊息的最小消費位點和最大消費位點會一直遞增變化。
消費位點(ConsumerOffset)
雲訊息佇列 RocketMQ 版領域模型為發布訂閱模式,每個主題的隊列都可以被多個消費者分組訂閱。若某條訊息被某個消費者消費後直接被刪除,則其他訂閱了該主題的消費者將無法消費該訊息。
因此,雲訊息佇列 RocketMQ 版通過消費位點管理訊息的消費進度。每條訊息被某個消費者消費完成後不會立即在隊列中刪除,雲訊息佇列 RocketMQ 版會基於每個消費者分組維護一份消費記錄,該記錄指定消費者分組消費某一個隊列時,消費過的最新一條訊息的位點,即消費位點。
當消費者用戶端離線,又再次重新上線時,會嚴格按照服務端儲存的消費進度繼續處理訊息。如果服務端儲存的歷史位點資訊已到期被刪除,此時消費位點向前移動至服務端儲存的最小位點。
消費位點的儲存和恢複是基於雲訊息佇列 RocketMQ 版服務端的儲存實現,和任何消費者無關。因此雲訊息佇列 RocketMQ 版支援跨消費者的消費進度恢複。
隊列中訊息位點MinOffset、MaxOffset和每個消費者分組的消費位點ConsumerOffset的關係如下:
ConsumerOffset≤MaxOffset:
當消費速度和生產速度一致,且全部訊息都處理完成時,最大訊息位點和消費位點相同,即ConsumerOffset=MaxOffset。
當消費速度較慢小於生產速度時,隊列中會有部分訊息未消費,此時消費位點小於最大訊息位點,即ConsumerOffset<MaxOffset,兩者之差就是該隊列中堆積的訊息量。
ConsumerOffset≥MinOffset:正常情況下有效消費位點ConsumerOffset必然大於等於最小訊息位點MinOffset。消費位點小於最小訊息位點時是無效的,相當於消費者要消費的訊息已經從隊列中刪除了,是無法消費到的,此時服務端會將消費位點強制糾正到合法的訊息位點。
消費位點初始值
消費位點初始值指的是消費者分組初次開機消費者消費訊息時,服務端儲存的消費位點的初始值。
雲訊息佇列 RocketMQ 版定義消費位點的初始值為消費者首次擷取訊息時,該時刻隊列中的最大訊息位點。相當於消費者將從隊列中最新的訊息開始消費。
重設消費位點
若消費者分組的初始消費位點或當前消費位點不符合您的業務預期,您可以通過重設消費位點調整您的消費進度。
適用情境
初始消費位點不符合需求:因初始消費位點為當前隊列的最大訊息位點,即用戶端會直接從最新訊息開始消費。若業務上線時需要消費部分歷史訊息,您可以通過重設消費位點功能消費到指定時刻前的訊息。
消費堆積快速清理:當下遊消費系統效能不足或消費速度小於生產速度時,會產生大量堆積訊息。若這部分堆積訊息可以丟棄,您可以通過重設消費位點快速將消費位點更新到指定位置,繞過這部分堆積的訊息,減少下遊處理壓力。
業務回溯,糾正處理:由於業務消費邏輯出現異常,訊息被錯誤處理。若您希望重新消費這些已被處理的訊息,可以通過重設消費位點快速將消費位點更新到歷史指定位置,實現消費回溯。
重設功能
雲訊息佇列 RocketMQ 版的重設消費位點提供以下能力:
從最新位點開始消費
Group ID消費指定Topic中的訊息時,會跳過當前堆積(未被消費)的所有訊息,從重設操作時間後發送的最新訊息開始消費。
從指定時間的位點開始消費:
消費者將從重設時間對應的消費位點之後的訊息進行消費,不管這些訊息是否已被消費過。
可選時間範圍中的起始和終止時間分別是該Topic中儲存的最早的和最晚的一條訊息的生產時間,不能選擇超過可選時間範圍的時間點。
重設到某一時刻對應的消費位點,匹配位點時,服務端會根據自動匹配到該時刻最接近的消費位點。
設定方式
控制台操作入口:
登入雲訊息佇列 RocketMQ 版控制台並在左側導覽列選擇实例列表。
在实例列表中選擇指定執行個體進入实例详情頁面,然後在左側導覽列選擇Group 管理。
在Group列表中選擇指定Group進入Group 详情頁面進行重設消費位點操作。
OpenAPI介面:ResetConsumeOffset - 重設消費位點
使用限制
版本相容性
關於消費者分組的消費位點初始值,不同的服務端版本中定義如下:
服務端歷史版本(4.x/3.x版本):訊息位點初始值受當前隊列訊息狀態的影響。
服務端5.x版本:明確定義消費位點初始值為消費者擷取訊息時刻隊列中的最大訊息位點。
因此,若您將服務端版本從歷史版本升級到最新的5.x版本時,需要自行對消費者初次開機時的情況做相容性判斷。
使用建議
嚴格控制消費位點重設的許可權
重設消費位點會給系統帶來額外處理壓力,可能會影響新訊息的讀寫效能。 因此該操作請在適用情境下謹慎執行,並提前做好合理性和必要性評估。