全部產品
Search
文件中心

PolarDB:DBMS_AQ

更新時間:Jul 06, 2024

您可以使用DBMS_AQ包中的預存程序添加訊息到隊列、從隊列中刪除訊息、註冊或登出PL/SQL回調預存程序。

PolarDB通過以下SQL命令為DBMS_AQ包提供擴充功能:
  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE
表 1. DBMS_AQ 函數/預存程序
函數/預存程序傳回型別說明
ENQUEUEN/A發布訊息到隊列。
DEQUEUEN/A如果有訊息可用或者在訊息可用時,從隊列檢索訊息。
REGISTERN/A註冊回調過程。
UNREGISTERN/A登出回調過程。
PolarDB支援使用下表列出的常量:
常量說明適用參數
DBMS_AQ.BROWSE (0)讀取訊息而不鎖定。dequeue_options_t.dequeue_mode
DBMS_AQ.LOCKED (1)常量,必須指定在PL/SQL常量包的範圍內的常量。dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE (2)讀取之後刪除訊息,該參數為預設值。dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE_NODATA (3)常量,必須指定在PL/SQL常量包的範圍內的常量。dequeue_options_t.dequeue_mode
DBMS_AQ.FIRST_MESSAGE (0)返回與搜尋字詞匹配的第一個可用訊息。dequeue_options_t.navigation
DBMS_AQ.NEXT_MESSAGE (1)返回與搜尋字詞匹配的下一個可用訊息。dequeue_options_t.navigation
DBMS_AQ.NEXT_TRANSACTION (2)常量,必須指定在PL/SQL常量包的範圍內的常量。dequeue_options_t.navigation
DBMS_AQ.FOREVER (0)如果找不到與搜尋字詞匹配的訊息,則持續等待,該參數為預設值。dequeue_options_t.wait
DBMS_AQ.NO_WAIT (1)如果找不到與搜尋字詞匹配的訊息,則不等待。dequeue_options_t.wait
DBMS_AQ.ON_COMMIT (0)常量,必須指定在PL/SQL常量包的範圍內的常量。enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.IMMEDIATE (1)常量,必須指定在PL/SQL常量包的範圍內的常量。enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.PERSISTENT (0)此訊息應儲存在表中。enqueue_options_t.delivery_mode
DBMS_AQ.BUFFERED (1)常量,必須指定在PL/SQL常量包的範圍內的常量。enqueue_options_t.delivery_mode
DBMS_AQ.READY (0)指定訊息已經準備好進行處理。message_properties_t.state
DBMS_AQ.WAITING (1)指定訊息正在等待處理。message_properties_t.state
DBMS_AQ.PROCESSED (2)指定訊息已處理。message_properties_t.state
DBMS_AQ.EXPIRED (3)指定訊息處於異常隊列中。message_properties_t.state
DBMS_AQ.NO_DELAY (0)常量,必須指定在PL/SQL常量包的範圍內的常量。message_properties_t.delay
DBMS_AQ.NEVER (NULL)常量,必須指定在PL/SQL常量包的範圍內的常量。message_properties_t.expiration
DBMS_AQ.NAMESPACE_AQ (0)接收來自DBMS_AQ隊列的通知。sys.aq$_reg_info.namespace
DBMS_AQ.NAMESPACE_ANONYMOUS (1)常量,必須指定在PL/SQL常量包的範圍內的常量。sys.aq$_reg_info.namespace

ENQUEUE

ENQUEUE預存程序將一個條目添加到隊列。文法如下:
ENQUEUE(
  queue_name IN VARCHAR2,
  enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
  message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload IN <type_name>,
  msgid OUT RAW)
參數
  • queue_name
    現有隊列的名稱(可能是schema限定的)。
    說明
    • 如果您省略schema名稱,伺服器將使用在SEARCH_PATH中指定的schema。
    • 使用特殊字元或者區分大小寫名稱時,請添加雙引號。
  • enqueue_options
    enqueue_options是類型為enqueue_options_t的值:
    DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD(
      visibility BINARY_INTEGER DEFAULT ON_COMMIT,
      relative_msgid RAW(16) DEFAULT NULL,
      sequence_deviation BINARY INTEGER DEFAULT NULL,
      transformation VARCHAR2(61) DEFAULT NULL,
      delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
    目前,enqueue_options_t僅支援下表中的參數值:
    參數名稱預設值
    visibilityON_COMMIT
    delivery_modePERSISTENT
    sequence_deviationNULL
    transformationNULL
    relative_msgidNULL
  • message_properties
    message_properties 是類型為message_properties_t的值:
          message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
          state INTEGER,
         original_msgid BYTEA,
          transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
          delivery_mode INTEGER
        DBMS_AQ.PERSISTENT);
    message_properties_t支援的參數值如下:
    參數說明
    priority如果隊列表定義包括sort_list並引用了priority,則此參數影響訊息出隊的順序。較低的值表示較高的出隊優先順序。
    delay指定訊息可用於出隊之前將經過的秒數,或者NO_DELAY。
    expiration使用expiration參數指定訊息到期的秒數。
    correlation使用 correlation參數指定與條目關聯的訊息,預設值為NULL。
    attempts系統維護的值,指定訊息出隊的嘗試次數。
    exception_queue使用exception_queue參數指定異常隊列的名稱,如果有訊息到期或者回退太多次數的事務出隊,則將訊息移動到該隊列中。
    enqueue_time記錄添加到隊列的時間。
    state此參數由DBMS_AQ指定,狀態取值如下:
    • DBMS_AQ.READY : 未達到延遲。
    • DBMS_AQ.WAITING : 隊列條目已準備好處理。
    • DBMS_AQ.PROCESSED :隊列條目已處理。
    • DBMS_AQ.EXPIRED : 隊列條目已移動到異常隊列。
    original_msgid為了實現相容性而支援此參數。
    transaction_group為了實現相容性而支援此參數。
  • payload

    使用 payload參數提供將與隊列條目關聯的資料。承載類型必須與建立對應的隊列表時指定的類型匹配(詳情請參見CREATE_QUEUE_TABLE)。

  • msgid

    使用msgid 參數檢索唯一(系統產生的)訊息標識符。

樣本

以下匿名塊通過調用DBMS_AQ.ENQUEUE,將訊息添加到名為work_order的隊列:
DECLARE

  enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN

  payload := work_order('Smith', 'system upgrade');

DBMS_AQ.ENQUEUE(
  queue_name         => 'work_order',
  enqueue_options    => enqueue_options,
  message_properties => message_properties,
  payload            => payload,
  msgid              => message_handle
    );
 END;

DEQUEUE

DEQUEUE預存程序讓訊息出隊。文法如下:
DEQUEUE(
  queue_name IN VARCHAR2,
  dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
  message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload OUT type_name,
  msgid OUT RAW)
參數
  • queue_name
    現有隊列的名稱(可能是schema限定的)。
    說明
    • 如果您省略schema名稱,則伺服器將使用在SEARCH_PATH中指定的schema。
    • 如果要使用特殊字元或者區分大小寫名稱,請添加雙引號。
  • dequeue_options
    dequeue _options是類型為dequeue_options_t的值:
    DEQUEUE_OPTIONS_T IS RECORD(
      consumer_name CHARACTER VARYING(30),
      dequeue_mode INTEGER,
      navigation INTEGER,
      visibility INTEGER,
      wait INTEGER,
      msgid BYTEA,
      correlation CHARACTER VARYING(128),
      deq_condition CHARACTER VARYING(4000),
      transformation CHARACTER VARYING(61),
      delivery_mode INTEGER);
    目前,dequeue_options_t支援的參數值為:
    參數說明
    consumer_name必須為NULL。
    dequeue_mode出隊操作的鎖定行為。取值如下:
    • DBMS_AQ.BROWSE:讀取訊息而不擷取鎖定。
    • DBMS_AQ.LOCKED: 擷取鎖定之後讀取訊息。
    • DBMS_AQ.REMOVE:刪除訊息之前讀取訊息。
    • DBMS_AQ.REMOVE_NODATA:讀取訊息,但不刪除訊息。
    navigation標識將檢索的訊息。取值如下:
    • FIRST_MESSAGE:隊列中與搜尋字詞匹配的第一條訊息。
    • NEXT_MESSAGE:與第一個詞語匹配的下一條可用訊息。
    visibility必須為ON_COMMIT, 如果您回退當前事務,出隊專案將保持在隊列中。
    wait必須為大於0的數字,或者設定為以下參數:
    • DBMS_AQ.FOREVER:無限期等待。
    • DBMS_AQ.NO_WAIT:不等待。
    msgid出隊訊息的ID。
    correlation為了實現相容性而提供的參數。
    deq_condition一個VARCHAR2運算式,取值為BOOLEAN值,表示訊息是否應出隊。
    transformation為了實現相容性而提供的參數。
    delivery_mode必須為PERSISTENT,此模式下不支援緩衝的訊息。
  • message_properties
    message_properties是類型為message_properties_t的值:
        message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
        state INTEGER,
        original_msgid BYTEA,
        transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
        delivery_mode INTEGER
      DBMS_AQ.PERSISTENT);
    message_properties_t支援的參數值為:
    參數說明
    priority如果隊列表定義包括sort_list並引用了priority,則此參數影響訊息出隊的順序。較低的值表示較高的出隊優先順序。
    delay指定訊息出隊或NO_DELAY之前經過的秒數。
    expiration指定訊息到期的秒數。
    correlation 指定與條目關聯的訊息,預設值為NULL。
    attempts系統維護的值,指定訊息出隊的嘗試次數。
    exception_queue指定異常隊列的名稱,如果有訊息到期或者回退太多次數的事務出隊,則將訊息移動到該隊列中。
    enqueue_time記錄添加到隊列的時間。
    state此參數由DBMS_AQ指定,狀態取值如下:
    • DBMS_AQ.WAITING:未達到延遲。
    • DBMS_AQ.READY:隊列條目已準備好處理。
    • DBMS_AQ.PROCESSED:隊列條目已處理。
    • DBMS_AQ.EXPIRED:隊列條目已移動到異常隊列。
    original_msgid為了實現相容性而提供的參數。
    transaction_group為了實現相容性而提供的參數。
    delivery_mode不支援此參數,指定DBMS_AQ.PERSISTENT的值。
  • payload

    使用payload參數檢索具有出隊操作的訊息的承載。承載類型必須與建立隊列表時指定的類型匹配。

  • msgid

    使用msgid參數檢索唯一訊息標識符。

樣本

以下匿名塊通過調用DBMS_AQ.DEQUEUE,從隊列和承載中檢索訊息:
DECLARE

  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN
  dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'work_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => payload,
    msgid              => message_handle
  );

  DBMS_OUTPUT.PUT_LINE(
  'The next work order is [' || payload.subject || '].'
  );
END;
承載由DBMS_OUTPUT.PUT_LINE顯示。

REGISTER

使用REGISTER預存程序用於在訊息入隊或出隊時接收通知。文法如下:
REGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count IN NUMBER)
參數
  • reg_list
    reg_list是類型為AQ$_REG_INFO_LIST的列表,提供有關您要註冊的各種訂閱資訊。列表中每個條目的類型都是AQ$_REG_INFO,包含的屬性有:
    屬性類型說明
    nameVARCHAR2 (128)訂閱的名稱(可能是 schema 限定的)。
    namespaceNUMERIC唯一支援的值為DBMS_AQ.NAMESPACE_AQ (0)。
    callbackVARCHAR2 (4000)描述對通知執行的操作。目前,僅支援調用PL/SQL預存程序。調用應採取以下形式:plsql://schema.procedure其中:
    • schema:指定預存程序所在的schema。
    • procedure:指定待通知的預存程序的名稱。
    contextRAW (16)回調預存程序需要的使用者定義的值。
  • count

    countreg_list中的條目數。

樣本

以下匿名塊通過調用DBMS_AQ.REGISTER註冊預存程序,用於在隊列中添加或刪除專案時接收通知。為在DECLARE部分標識的每個訂閱資訊提供一組屬性(類型為 sys.aq$_reg_info):
DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.register(subscriptionlist, 3);
   commit;
  END;
   /
subscriptionlist的類型為sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info對象。列表名稱和對象計數傳遞到dbms_aq.register

UNREGISTER

使用UNREGISTER預存程序關閉與入隊和出隊相關的通知。文法如下:
UNREGISTER(
 reg_list IN SYS.AQ$_REG_INFO_LIST,
 count
IN NUMBER)
參數
  • reg_list

    reg_list是類型為AQ$_REG_INFO_LIST的列表,提供有關您要註冊的各個訂閱資訊。列表中每個條目的類型都是AQ$_REG_INFO,包含的屬性有:

    屬性類型說明
    nameVARCHAR2 (128)訂閱的名稱(可能是schema限定的)。
    namespaceNUMERIC唯一支援的值為DBMS_AQ.NAMESPACE_AQ (0)。
    callbackVARCHAR2 (4000)描述對通知執行的操作。目前,僅支援調用PL/SQL預存程序。調用應採取以下形式:plsql://schema.procedure其中:
    • schema:指定預存程序所在的schema。
    • procedure:指定將通知的預存程序的名稱。
    contextRAW (16)該預存程序需要的任何使用者定義的值。
  • count

    countreg_list中的條目數。

樣本

以下匿名塊通過調用DBMS_AQ.UNREGISTER關閉樣本中DBMS_AQ.REGISTER指定的通知:

subscriptionlist的類型為sys.aq$_reg_info_list,包含之前描述的sys.aq$_reg_info對象、列表名稱和對象數量將傳遞到dbms_aq.unregister
DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.unregister(subscriptionlist, 3);
   commit;
  END;
   /