您可以使用DBMS_AQ包中的預存程序添加訊息到隊列、從隊列中刪除訊息、註冊或登出PL/SQL回調預存程序。
PolarDB通過以下SQL命令為
DBMS_AQ包提供擴充功能:ALTER QUEUEALTER QUEUE TABLECREATE QUEUECREATE QUEUE TABLEDROP QUEUEDROP QUEUE TABLE
| 函數/預存程序 | 傳回型別 | 說明 |
| ENQUEUE | N/A | 發布訊息到隊列。 |
| DEQUEUE | N/A | 如果有訊息可用或者在訊息可用時,從隊列檢索訊息。 |
| REGISTER | N/A | 註冊回調過程。 |
| UNREGISTER | N/A | 登出回調過程。 |
PolarDB支援使用下表列出的常量:
| 常量 | 說明 | 適用參數 |
| DBMS_AQ.BROWSE (0) | 讀取訊息而不鎖定。 | dequeue_options_t.dequeue_mode |
| DBMS_AQ.LOCKED (1) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | dequeue_options_t.dequeue_mode |
| DBMS_AQ.REMOVE (2) | 讀取之後刪除訊息,該參數為預設值。 | dequeue_options_t.dequeue_mode |
| DBMS_AQ.REMOVE_NODATA (3) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | dequeue_options_t.dequeue_mode |
| DBMS_AQ.FIRST_MESSAGE (0) | 返回與搜尋字詞匹配的第一個可用訊息。 | dequeue_options_t.navigation |
| DBMS_AQ.NEXT_MESSAGE (1) | 返回與搜尋字詞匹配的下一個可用訊息。 | dequeue_options_t.navigation |
| DBMS_AQ.NEXT_TRANSACTION (2) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | dequeue_options_t.navigation |
| DBMS_AQ.FOREVER (0) | 如果找不到與搜尋字詞匹配的訊息,則持續等待,該參數為預設值。 | dequeue_options_t.wait |
| DBMS_AQ.NO_WAIT (1) | 如果找不到與搜尋字詞匹配的訊息,則不等待。 | dequeue_options_t.wait |
| DBMS_AQ.ON_COMMIT (0) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
| DBMS_AQ.IMMEDIATE (1) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
| DBMS_AQ.PERSISTENT (0) | 此訊息應儲存在表中。 | enqueue_options_t.delivery_mode |
| DBMS_AQ.BUFFERED (1) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | enqueue_options_t.delivery_mode |
| DBMS_AQ.READY (0) | 指定訊息已經準備好進行處理。 | message_properties_t.state |
| DBMS_AQ.WAITING (1) | 指定訊息正在等待處理。 | message_properties_t.state |
| DBMS_AQ.PROCESSED (2) | 指定訊息已處理。 | message_properties_t.state |
| DBMS_AQ.EXPIRED (3) | 指定訊息處於異常隊列中。 | message_properties_t.state |
| DBMS_AQ.NO_DELAY (0) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | message_properties_t.delay |
| DBMS_AQ.NEVER (NULL) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | message_properties_t.expiration |
| DBMS_AQ.NAMESPACE_AQ (0) | 接收來自DBMS_AQ隊列的通知。 | sys.aq$_reg_info.namespace |
| DBMS_AQ.NAMESPACE_ANONYMOUS (1) | 常量,必須指定在PL/SQL常量包的範圍內的常量。 | sys.aq$_reg_info.namespace |
ENQUEUE
ENQUEUE預存程序將一個條目添加到隊列。文法如下:ENQUEUE(
queue_name IN VARCHAR2,
enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
payload IN <type_name>,
msgid OUT RAW)參數
queue_name現有隊列的名稱(可能是schema限定的)。說明- 如果您省略schema名稱,伺服器將使用在SEARCH_PATH中指定的schema。
- 使用特殊字元或者區分大小寫名稱時,請添加雙引號。
enqueue_optionsenqueue_options是類型為enqueue_options_t的值:
目前,DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD( visibility BINARY_INTEGER DEFAULT ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY INTEGER DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);enqueue_options_t僅支援下表中的參數值:參數名稱 預設值 visibility ON_COMMIT delivery_mode PERSISTENT sequence_deviation NULL transformation NULL relative_msgid NULL message_propertiesmessage_properties是類型為message_properties_t的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);message_properties_t支援的參數值如下:參數 說明 priority 如果隊列表定義包括sort_list並引用了priority,則此參數影響訊息出隊的順序。較低的值表示較高的出隊優先順序。 delay 指定訊息可用於出隊之前將經過的秒數,或者NO_DELAY。 expiration 使用expiration參數指定訊息到期的秒數。 correlation 使用 correlation參數指定與條目關聯的訊息,預設值為NULL。 attempts 系統維護的值,指定訊息出隊的嘗試次數。 exception_queue 使用exception_queue參數指定異常隊列的名稱,如果有訊息到期或者回退太多次數的事務出隊,則將訊息移動到該隊列中。 enqueue_time 記錄添加到隊列的時間。 state 此參數由DBMS_AQ指定,狀態取值如下: - DBMS_AQ.READY : 未達到延遲。
- DBMS_AQ.WAITING : 隊列條目已準備好處理。
- DBMS_AQ.PROCESSED :隊列條目已處理。
- DBMS_AQ.EXPIRED : 隊列條目已移動到異常隊列。
original_msgid 為了實現相容性而支援此參數。 transaction_group 為了實現相容性而支援此參數。 payload使用
payload參數提供將與隊列條目關聯的資料。承載類型必須與建立對應的隊列表時指定的類型匹配(詳情請參見CREATE_QUEUE_TABLE)。msgid使用
msgid參數檢索唯一(系統產生的)訊息標識符。
樣本
以下匿名塊通過調用
DBMS_AQ.ENQUEUE,將訊息添加到名為work_order的隊列:DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
payload := work_order('Smith', 'system upgrade');
DBMS_AQ.ENQUEUE(
queue_name => 'work_order',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
END;DEQUEUE
DEQUEUE預存程序讓訊息出隊。文法如下:DEQUEUE(
queue_name IN VARCHAR2,
dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
payload OUT type_name,
msgid OUT RAW)參數
queue_name現有隊列的名稱(可能是schema限定的)。說明- 如果您省略schema名稱,則伺服器將使用在SEARCH_PATH中指定的schema。
- 如果要使用特殊字元或者區分大小寫名稱,請添加雙引號。
dequeue_optionsdequeue _options是類型為dequeue_options_t的值:
目前,DEQUEUE_OPTIONS_T IS RECORD( consumer_name CHARACTER VARYING(30), dequeue_mode INTEGER, navigation INTEGER, visibility INTEGER, wait INTEGER, msgid BYTEA, correlation CHARACTER VARYING(128), deq_condition CHARACTER VARYING(4000), transformation CHARACTER VARYING(61), delivery_mode INTEGER);dequeue_options_t支援的參數值為:參數 說明 consumer_name 必須為NULL。 dequeue_mode 出隊操作的鎖定行為。取值如下: - DBMS_AQ.BROWSE:讀取訊息而不擷取鎖定。
- DBMS_AQ.LOCKED: 擷取鎖定之後讀取訊息。
- DBMS_AQ.REMOVE:刪除訊息之前讀取訊息。
- DBMS_AQ.REMOVE_NODATA:讀取訊息,但不刪除訊息。
navigation 標識將檢索的訊息。取值如下: - FIRST_MESSAGE:隊列中與搜尋字詞匹配的第一條訊息。
- NEXT_MESSAGE:與第一個詞語匹配的下一條可用訊息。
visibility 必須為ON_COMMIT, 如果您回退當前事務,出隊專案將保持在隊列中。 wait 必須為大於0的數字,或者設定為以下參數: - DBMS_AQ.FOREVER:無限期等待。
- DBMS_AQ.NO_WAIT:不等待。
msgid 出隊訊息的ID。 correlation 為了實現相容性而提供的參數。 deq_condition 一個VARCHAR2運算式,取值為BOOLEAN值,表示訊息是否應出隊。 transformation 為了實現相容性而提供的參數。 delivery_mode 必須為PERSISTENT,此模式下不支援緩衝的訊息。 message_propertiesmessage_properties是類型為message_properties_t的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);message_properties_t支援的參數值為:參數 說明 priority 如果隊列表定義包括sort_list並引用了priority,則此參數影響訊息出隊的順序。較低的值表示較高的出隊優先順序。 delay 指定訊息出隊或NO_DELAY之前經過的秒數。 expiration 指定訊息到期的秒數。 correlation 指定與條目關聯的訊息,預設值為NULL。 attempts 系統維護的值,指定訊息出隊的嘗試次數。 exception_queue 指定異常隊列的名稱,如果有訊息到期或者回退太多次數的事務出隊,則將訊息移動到該隊列中。 enqueue_time 記錄添加到隊列的時間。 state 此參數由DBMS_AQ指定,狀態取值如下: - DBMS_AQ.WAITING:未達到延遲。
- DBMS_AQ.READY:隊列條目已準備好處理。
- DBMS_AQ.PROCESSED:隊列條目已處理。
- DBMS_AQ.EXPIRED:隊列條目已移動到異常隊列。
original_msgid 為了實現相容性而提供的參數。 transaction_group 為了實現相容性而提供的參數。 delivery_mode 不支援此參數,指定DBMS_AQ.PERSISTENT的值。 payload使用
payload參數檢索具有出隊操作的訊息的承載。承載類型必須與建立隊列表時指定的類型匹配。msgid使用
msgid參數檢索唯一訊息標識符。
樣本
以下匿名塊通過調用
DBMS_AQ.DEQUEUE,從隊列和承載中檢索訊息:DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'work_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
DBMS_OUTPUT.PUT_LINE(
'The next work order is [' || payload.subject || '].'
);
END;承載由DBMS_OUTPUT.PUT_LINE顯示。REGISTER
使用REGISTER預存程序用於在訊息入隊或出隊時接收通知。文法如下:
REGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count IN NUMBER)參數
reg_listreg_list是類型為AQ$_REG_INFO_LIST的列表,提供有關您要註冊的各種訂閱資訊。列表中每個條目的類型都是AQ$_REG_INFO,包含的屬性有:屬性 類型 說明 name VARCHAR2 (128) 訂閱的名稱(可能是 schema 限定的)。 namespace NUMERIC 唯一支援的值為DBMS_AQ.NAMESPACE_AQ (0)。 callback VARCHAR2 (4000) 描述對通知執行的操作。目前,僅支援調用PL/SQL預存程序。調用應採取以下形式: plsql://schema.procedure其中:- schema:指定預存程序所在的schema。
- procedure:指定待通知的預存程序的名稱。
context RAW (16) 回調預存程序需要的使用者定義的值。 countcount是reg_list中的條目數。
樣本
以下匿名塊通過調用
DBMS_AQ.REGISTER註冊預存程序,用於在隊列中添加或刪除專案時接收通知。為在DECLARE部分標識的每個訂閱資訊提供一組屬性(類型為 sys.aq$_reg_info):DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.register(subscriptionlist, 3);
commit;
END;
/subscriptionlist的類型為sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info對象。列表名稱和對象計數傳遞到dbms_aq.register。UNREGISTER
使用
UNREGISTER預存程序關閉與入隊和出隊相關的通知。文法如下:UNREGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count
IN NUMBER)參數
reg_listreg_list是類型為AQ$_REG_INFO_LIST的列表,提供有關您要註冊的各個訂閱資訊。列表中每個條目的類型都是AQ$_REG_INFO,包含的屬性有:屬性 類型 說明 name VARCHAR2 (128) 訂閱的名稱(可能是schema限定的)。 namespace NUMERIC 唯一支援的值為DBMS_AQ.NAMESPACE_AQ (0)。 callback VARCHAR2 (4000) 描述對通知執行的操作。目前,僅支援調用PL/SQL預存程序。調用應採取以下形式: plsql://schema.procedure其中:- schema:指定預存程序所在的schema。
- procedure:指定將通知的預存程序的名稱。
context RAW (16) 該預存程序需要的任何使用者定義的值。 countcount是reg_list中的條目數。
樣本
以下匿名塊通過調用DBMS_AQ.UNREGISTER關閉樣本中DBMS_AQ.REGISTER指定的通知:
subscriptionlist的類型為sys.aq$_reg_info_list,包含之前描述的sys.aq$_reg_info對象、列表名稱和對象數量將傳遞到dbms_aq.unregister。DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.unregister(subscriptionlist, 3);
commit;
END;
/