訂閱推送可以將就緒的資料立即推送到用戶端,降低通過Get API輪詢造成的時延和通過佇列服務查詢帶來的佇列服務負載。然而,由於涉及較多額外概念,訂閱推送具有一定的複雜性。本文將介紹佇列服務訂閱推送功能的使用。
消費者
消費者是指從佇列服務中訂閱資料的用戶端程式,當用戶端使用Watch API進行資料調用時,會在佇列服務中產生消費者對象。API中的參數(如Window的大小、Tags),將作為消費者的屬性。通過Attribute API查看消費者狀態,樣本如下:
[OK] Attributes:
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2其中:consumers.stats.total表示消費者總數量。consumers.list是消費者列表,各列說明如下:
參數 | 說明 |
Id | 為消費者的ID全稱,格式為 |
Index | 為當前消費者正在消費的資料index。 |
Pending | 指示當前消費者正在處理,但沒有進行Commit的資料數量。 |
Status | 消費者的狀態,主要的狀態有:
|
Window | 消費者視窗大小,即允許的最巨量資料推送數量。 |
Slots | 視窗空閑數量,如果Slots為0,則視窗已經佔滿。 |
AutoCommit | 是否在資料發出後,自動Commit資料。 |
Tags | 該消費者的tags過濾條件。 說明 當您使用帶有tags的watch API時,需要確保同消費者組的消費者使用的tags都是相同的。 |
使用Watch API時,如果執行了資料的tag,則會看到額外的Tags列,例如:
consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]表示該consumer關注的資料tags,只有當資料滿足該條件時,資料才會送達該消費者。
消費者組
消費者組是以相同過濾條件訂閱佇列服務的消費者集合。同組內的消費者不可以同名,不同組內的消費者可以同名。
在同一消費者組內,資料均衡分發給各個消費者;在不同組間,資料並列推送給每個組的消費者。例如:
同組內消費者會收到不同的資料。
不同組的消費者會收到相同的資料。
如果某個消費者通過API刪除資料,該資料會立即刪除,其他組內的消費者將無法收到。
您可以通過Attribute API看到佇列服務中的消費者狀態,樣本如下:
groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0groups.list是消費者列表,各列說明如下:
參數 | 說明 |
Id | 為消費者組的ID。 |
Index | 為當前消費者組正在消費的Index,為所有組內消費者最大的Index。 |
Pending | 指示當前消費者組正在處理,但沒有進行Commit的資料數量。 |
Delivered | 以及推送出去的訊息數量。 |
Consumers | 消費者組內的消費數量。 |
消費者組數量沒有上限,但不會自動清理,建立後狀態會一直保留。
消費者與消費者組的使用
您可以在Watch API調用中通過HTTP Header聲明所使用的消費者與消費者組,或者在各個語言的SDK中,在client初始化時進行聲明。相關HTTP Header的key也可以通過Attributes API進行查看。
meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid通過X-EAS-QueueService-Uid,X-EAS-QueueService-Gid分別聲明使用的消費者ID和加入的消費者組ID。
Commit與Negative
佇列服務支援Commit與Negative兩種消費方式,操作對象都是資料的Index,但語義不同。
Commit表示該消費者已收到資料並處理完畢,可推送下一批。
Negative表示消費者已經收到資料但無法處理,佇列服務根據錯誤Code決定是否推送下一批。可以在Negative的同時以文本方式聲明原因與錯誤Code,該資料會被推送給其他消費者。下表列出了佇列服務能夠處理的特殊錯誤Code:
Code
說明
Shutdown
表明該消費者正在執行退出,佇列服務不會繼續推送資料。
資料重平衡
在很多情境下,消費者無法進行資料Commit,比如:
在預測服務變換時,部分消費者被終止,導致正在處理的資料無法Commit。
消費者遇到了某些內部錯誤導致崩潰。
消費者無法處理收到的資料而執行Negative Commit。
這些無法處理的資料會被佇列服務重新分發給其他消費者,這種機制稱為資料重平衡。資料重平衡會在以下時間點發生:
任一消費者進入Exit狀態。
消費者在視窗有閒置情況下沒有收到新的資料推送。
佇列服務為每條資料維護投遞計數器,每次資料執行重平衡且被分發出去後,計數器加一。當在重平衡過程中,發現某資料的投遞計數器已經超過了最大投遞次數,該資料被作為死信進行處理。佇列服務會執行您配置的死信策略,預設情況下會將資料投遞到尾隊列。
尾隊列
尾隊列是用於存放不推送給消費者的資料(如死信或自訂的控制資料)的輔助隊列。它是佇列服務內的一個隊列執行個體,具有相同的API。每個輸入和輸出隊列都有一個尾隊列。
尾隊列和普通隊列共用最大隊列長度。如果最大長度為10,普通隊列佔用6,則尾隊列最多為4。若尾隊列已滿,再寫入會返回隊列過長錯誤。因此,需定期觀察和清理尾隊列。
在API調用時您可以通過增加以下HTTP Header來聲明訪問尾隊列:
X-EAS-QueueService-Access-Rear: true