PolarDB provides message queuing and message processing. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Stored procedures in the DBMS_AQADM package can be used to create and manage message queues and queue tables. You can use the DBMS_AQ package to add messages to a queue or delete messages from a queue, or register or unregister a Procedural Language for SQL (PL/SQL) callback procedure.

PolarDB also provides extended (non-compatible) features for the DBMS_AQ package by running the following Structured Query Language (SQL) commands:

  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE

The DBMS_AQ package provides procedures that allow you to enqueue a message, dequeue a message, and manage callback procedures. The following table describes the supported stored procedures.

Function or stored procedure Return type Description
ENQUEUE n/a Posts a message to a queue.
DEQUEUE n/a Retrieves a message from a queue if a message is available or when a message becomes available.
REGISTER n/a Registers a callback procedure.
UNREGISTER n/a Unregisters a callback procedure.

The implementation of DBMS_AQ in PolarDB is a partial implementation compared with the Oracle version. Only those stored procedures listed in the preceding table are supported.

The following table describes the constants that are supported by PolarDB.

Constant Description Applicable parameter
DBMS_AQ.BROWSE (0) Reads a message without locking. dequeue_options_t.dequeue_mode
DBMS_AQ.LOCKED (1) This constant is defined. If this constant is used, an error is returned. dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE (2) Deletes a message after it is read. This parameter is the default value. dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE_NODATA (3) This constant is defined. If this constant is used, an error is returned. dequeue_options_t.dequeue_mode
DBMS_AQ.FIRST_MESSAGE (0) Returns the first available message that matches the search terms. dequeue_options_t.navigation
DBMS_AQ.NEXT_MESSAGE (1) Returns the next available message that matches the search terms. dequeue_options_t.navigation
DBMS_AQ.NEXT_TRANSACTION (2) This constant is defined. If this constant is used, an error is returned. dequeue_options_t.navigation
DBMS_AQ.FOREVER (0) Keeps waiting if a message that matches the search term is not found. This parameter is the default value. dequeue_options_t.wait
DBMS_AQ.NO_WAIT (1) Does not wait if a message that matches the search term is not found. dequeue_options_t.wait
DBMS_AQ.ON_COMMIT (0) Dequeuing is part of the current transaction. enqueue_options_t.visibility, dequeue_options_t.visibility
DBMS_AQ.IMMEDIATE (1) This constant is defined. If this constant is used, an error is returned. enqueue_options_t.visibility, dequeue_options_t.visibility
DBMS_AQ.PERSISTENT (0) The message must be stored in a table. enqueue_options_t.delivery_mode
DBMS_AQ.BUFFERED (1) This constant is defined. If this constant is used, an error is returned. enqueue_options_t.delivery_mode
DBMS_AQ.READY (0) Specifies that the message is ready to be processed. message_properties_t.state
DBMS_AQ.WAITING (1) Specifies that the message is waiting to be processed. message_properties_t.state
DBMS_AQ.PROCESSED (2) Specifies that the message has been processed. message_properties_t.state
DBMS_AQ.EXPIRED (3) Specifies that the message is in an exception queue. message_properties_t.state
DBMS_AQ.NO_DELAY (0) This constant is defined. If this constant is used, an error is returned. message_properties_t.delay
DBMS_AQ.NEVER (NULL) This constant is defined. If this constant is used, an error is returned. message_properties_t.expiration
DBMS_AQ.NAMESPACE_AQ (0) Accepts notifications from DBMS_AQ queues. sys.aq$_reg_info.namespace
DBMS_AQ.NAMESPACE_ANONYMOUS (1) This constant is defined. If this constant is used, an error is returned. sys.aq$_reg_info.namespace

ENQUEUE

You can use the stored procedure ENQUEUE to add an entry to a queue. The following features are available:

ENQUEUE(
  queue_name IN VARCHAR2,
  enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
  message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload IN <type_name>,
  msgid OUT RAW)

Parameters

  • queue_name

    The name of an existing queue. This name may be a schema-qualified name. If you omit the schema name, the server uses the schema that is specified by SEARCH_PATH. Take note of the following item: Unquoted identifiers are converted to be lowercase before the identifiers are stored. This operation is not performed in Oracle. To include special characters or use a case-sensitive name, enclose the name in double quotation marks.

  • enqueue_options

    enqueue_options is a value of the enqueue_options_t type.

    DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD(
      visibility BINARY_INTEGER DEFAULT ON_COMMIT,
      relative_msgid RAW(16) DEFAULT NULL,
      sequence_deviation BINARY INTEGER DEFAULT NULL,
      transformation VARCHAR2(61) DEFAULT NULL,
      delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);

    The following table lists the only parameter values that are supported by enqueue_options_t.

    visibility ON_COMMIT
    delivery_mode PERSISTENT
    sequence_deviation NULL
    transformation NULL
    relative_msgid NULL
  • message_properties

    message_properties is a value of the message_properties_t type.

          message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog." C",
        attempts INTEGER,
        recipient_list"AQ$_RECIPIENT_LIST_T",
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog." C",
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
          state INTEGER,
         original_msgid BYTEA,
          transaction_group CHARACTER VARYING(30) COLLATE pg_catalog." C",
          delivery_mode INTEGER
        DBMS_AQ.PERSISTENT);

    The following table describes the values that are supported by message_properties_t.

    Parameter Description
    priority If the queue table definition includes sort_list that references priority, this parameter affects the order in which messages are dequeued. A lower value specifies a higher dequeuing priority.
    delay The number of seconds elapsed before a message becomes available for dequeuing. Alternatively, you can set this parameter to NO_DELAY.
    expiration The number of seconds elapsed before a message expires.
    correlation The message associated with the entry. Default value: NULL.
    attempts The number of attempts to dequeue the message. This value is a value that is maintained by the system.
    recipient_list This parameter is not supported.
    exception_queue The name of an exception queue. If the message expires or is dequeued by a transaction that is rolled back for a number of times, the message is moved to this queue.
    enqueue_time The time when the entry was added to the queue. This value is provided by the system.
    state This parameter is maintained by DBMS_AQ. Valid values:
    • DBMS_AQ.READY: The delay has not been reached.
    • DBMS_AQ.WAITING: The queue entry is ready for processing.
    • DBMS_AQ.PROCESSED: The queue entry has been processed.
    • DBMS_AQ.EXPIRED: The queue entry has been moved to the exception queue.
    original_msgid This parameter is supported for compatibility, but this parameter is ignored.
    transaction_group This parameter is supported for compatibility, but this parameter is ignored.
    delivery_mode This parameter is not supported. Specify a value of DBMS_AQ.PERSISTENT.
  • payload

    You can use the payload parameter to provide the data that is associated with the queue entry. The payload type must match the type that is specified when you create the corresponding queue table. For more information, see DBMS_AQADM.CREATE_QUEUE_TABLE.

  • msgid

    You can use the msgid parameter to retrieve a unique message identifier that is generated by the system.

Examples

The following anonymous block calls DBMS_AQ.ENQUEUE to add a message to a queue that is named work_order:

DECLARE

  enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN

  payload := work_order('Smith', 'system upgrade');

DBMS_AQ.ENQUEUE(
  queue_name         => 'work_order',
  enqueue_options    => enqueue_options,
  message_properties => message_properties,
  payload            => payload,
  msgid              => message_handle
    );
 END;

DEQUEUE

The stored procedure DEQUEUE dequeues a message. The following features are available:

DEQUEUE(
  queue_name IN VARCHAR2,
  dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
  message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload OUT type_name,
  msgid OUT RAW)

Parameters

  • queue_name

    The name of an existing queue. This name may be a schema-qualified name. If you omit the schema name, the server uses the schema that is specified by SEARCH_PATH. Take note of the following item: Unquoted identifiers are converted to be lowercase before the identifiers are stored. This operation is not performed in Oracle. To include special characters or use a case-sensitive name, enclose the name in double quotation marks.

  • dequeue_options

    dequeue _options is a value of the dequeue_options_t type.

    DEQUEUE_OPTIONS_T IS RECORD(
      consumer_name CHARACTER VARYING(30),
      dequeue_mode INTEGER,
      navigation INTEGER,
      visibility INTEGER,
      wait INTEGER,
      msgid BYTEA,
      correlation CHARACTER VARYING(128),
      deq_condition CHARACTER VARYING(4000),
      transformation CHARACTER VARYING(61),
      delivery_mode INTEGER);

    The following table describes the only parameter values that are supported by dequeue_options_t.

    Parameter Description
    consumer_name Must be NULL.
    dequeue_mode The locking behavior of the dequeuing operation. Valid values:
    • DBMS_AQ.BROWSE: reads a message without obtaining a lock.
    • DBMS_AQ.LOCKED: reads a message after a lock is obtained.
    • DBMS_AQ.REMOVE: reads a message before the message is deleted.
    • DBMS_AQ.REMOVE_NODATA: reads a message but does not delete the message.
    navigation Identifies the message to be retrieved. Valid values:
    • FIRST_MESSAGE: the first message in the queue that matches the search term.
    • NEXT_MESSAGE: the next available message that matches the first term.
    visibility Must be ON_COMMIT: If you roll back the current transaction, the dequeued item remains in the queue.
    wait Must be a number larger than 0, or be set to:
    • DBMS_AQ.FOREVER: waits indefinitely.
    • DBMS_AQ.NO_WAIT: does not wait.
    msgid The ID of the message to be dequeued.
    correlation This parameter is supported for compatibility, but this parameter is ignored.
    deq_condition A VARCHAR2 expression that calculates a BOOLEAN value and specifies whether a message must be dequeued.
    transformation This parameter is supported for compatibility, but this parameter is ignored.
    delivery_mode Must be PERSISTENT. Buffered messages are not supported in this mode.
  • message_properties

    message_properties is a value of the message_properties_t type.

        message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog." C",
        attempts INTEGER,
        recipient_list"AQ$_RECIPIENT_LIST_T",
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog." C",
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
        state INTEGER,
        original_msgid BYTEA,
        transaction_group CHARACTER VARYING(30) COLLATE pg_catalog." C",
        delivery_mode INTEGER
      DBMS_AQ.PERSISTENT);

    The following table describes the values that are supported by message_properties_t.

    Parameter Description
    priority If the queue table definition includes sort_list that references priority, this parameter affects the order in which messages are dequeued. A lower value specifies a higher dequeuing priority.
    delay The number of seconds elapsed before a message becomes available for dequeuing. The NO_DELAY constant specifies that a message is immediately dequeued after the message is available.
    expiration The number of seconds elapsed before a message expires.
    correlation The message associated with the entry. Default value: NULL.
    attempts The number of attempts to dequeue the message. This value is a value that is maintained by the system.
    recipient_list This parameter is not supported.
    exception_queue The name of an exception queue. If the message expires or is dequeued by a transaction that is rolled back for a number of times, the message is moved to this queue.
    enqueue_time The time when the entry was added to the queue. This value is provided by the system.
    state This parameter is maintained by DBMS_AQ. Valid values:
    • DBMS_AQ.WAITING: The delay has not been reached.
    • DBMS_AQ.READY: The queue entry is ready for processing.
    • DBMS_AQ.PROCESSED: The queue entry has been processed.
    • DBMS_AQ.EXPIRED: The queue entry has been moved to the exception queue.
    original_msgid This parameter is supported for compatibility, but this parameter is ignored.
    transaction_group This parameter is supported for compatibility, but this parameter is ignored.
    delivery_mode This parameter is not supported. Specify a value of DBMS_AQ.PERSISTENT.
  • payload

    You can use the payload parameter to retrieve the payload of a message that is involved in a dequeuing operation. The payload type must match the type that is specified when you create the queue table.

  • msgid

    You can use the msgid parameter to retrieve a unique message identifier.

Examples

The following anonymous block calls DBMS_AQ.DEQUEUE to retrieve a message from the queue and payload:

DECLARE

  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN
  dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'work_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => payload,
    msgid              => message_handle
  );

  DBMS_OUTPUT.PUT_LINE(
  'The next work order is [' || payload.subject || '].'
  );
END;

The payload is displayed by DBMS_OUTPUT.PUT_LINE.

REGISTER

You can use the stored procedure REGISTER to register an email address, procedure, or URL that is used to receive notifications when an item is enqueued or dequeued. The following features are available:

REGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count IN NUMBER)

Parameters

  • reg_list

    The reg_list parameter specifies a list of the AQ$_REG_INFO_LIST type. This list provides the information about each subscription that you want to register. The type of each entry in the list is AQ$_REG_INFO. The following table describes the included attributes.

    Attribute Type Description
    name VARCHAR2 (128) The name of a subscription. This name may be a schema-qualified name.
    namespace NUMERIC The only supported value is DBMS_AQ.NAMESPACE_AQ (0).
    callback VARCHAR2 (4000) Describes the action to be performed upon notification. Only PL/SQL stored procedures are supported. The procedures are called in the following form: plsql://schema.procedure. The following items describe the fields in this form:
    • The schema field specifies the schema in which the stored procedure resides.
    • The procedure field specifies the name of the stored procedure to be notified.
    context RAW (16) A user-defined value required by the callback procedure.
  • count

    The count parameter specifies the number of entries in reg_list.

Examples

The following anonymous block calls DBMS_AQ.REGISTER to register stored procedures that are notified when an item is added to or deleted from a queue. A set of attributes of the sys.aq$_reg_info type is provided for each subscription that is identified in the DECLARE section.

DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker? PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history? PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts? PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.register(subscriptionlist, 3);
   commit;
  END;
   /

The type of subscriptionlist is sys.aq$_reg_info_list and contains the sys.aq$_reg_info objects that were described. The list name and the object count are passed to dbms_aq.register.

UNREGISTER

You can use the stored procedure UNREGISTER to disable notifications that are related to enqueuing and dequeuing. The following features are available:

UNREGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count
IN NUMBER)

Parameters

  • reg_list

    The reg_list parameter specifies a list of the AQ$_REG_INFO_LIST type and provides the information about each subscription that you want to register. The type of each entry in the list is AQ$_REG_INFO. The following table describes the attributes that can be included in each entry.

    Attribute Type Description
    name VARCHAR2 (128) The name of a subscription. This name may be a schema-qualified name.
    namespace NUMERIC The only supported value is DBMS_AQ.NAMESPACE_AQ (0).
    callback VARCHAR2 (4000) Describes the action to be performed upon notification. Only PL/SQL stored procedures are supported. The procedures are called in the following form: plsql://schema.procedure. The following items describe the fields in this form:
    • The schema field specifies the schema in which the stored procedure resides.
    • The procedure field specifies the name of the stored procedure to be notified.
    context RAW (16) A user-defined value required by the stored procedure.
  • count

    The count parameter specifies the number of entries in reg_list.

Examples

The following anonymous block calls DBMS_AQ.UNREGISTER to disable the notifications that are specified in the example for DBMS_AQ.REGISTER:

DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker? PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history? PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts? PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.unregister(subscriptionlist, 3);
   commit;
  END;
   /

The type of subscriptionlist is sys.aq$_reg_info_list and contains the sys.aq$_reg_info objects that were described. The list name and the object count are passed to dbms_aq.unregister.