PolarDB provides message queuing and message processing. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Stored procedures in the DBMS_AQADM package can be used to create and manage message queues and queue tables. You can use the DBMS_AQ package to add messages to a queue or delete messages from a queue, or register or unregister a Procedural Language for SQL (PL/SQL) callback procedure.
PolarDB also provides extended (non-compatible) features for the DBMS_AQ package by running the following Structured Query Language (SQL) commands:
- ALTER QUEUE
- ALTER QUEUE TABLE
- CREATE QUEUE
- CREATE QUEUE TABLE
- DROP QUEUE
- DROP QUEUE TABLE
The DBMS_AQ package provides procedures that allow you to enqueue a message, dequeue a message, and manage callback procedures. The following table describes the supported stored procedures.
Function or stored procedure | Return type | Description |
---|---|---|
ENQUEUE | n/a | Posts a message to a queue. |
DEQUEUE | n/a | Retrieves a message from a queue if a message is available or when a message becomes available. |
REGISTER | n/a | Registers a callback procedure. |
UNREGISTER | n/a | Unregisters a callback procedure. |
The implementation of DBMS_AQ in PolarDB is a partial implementation compared with the Oracle version. Only those stored procedures listed in the preceding table are supported.
The following table describes the constants that are supported by PolarDB.
Constant | Description | Applicable parameter |
---|---|---|
DBMS_AQ.BROWSE (0) | Reads a message without locking. | dequeue_options_t.dequeue_mode |
DBMS_AQ.LOCKED (1) | This constant is defined. If this constant is used, an error is returned. | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE (2) | Deletes a message after it is read. This parameter is the default value. | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE_NODATA (3) | This constant is defined. If this constant is used, an error is returned. | dequeue_options_t.dequeue_mode |
DBMS_AQ.FIRST_MESSAGE (0) | Returns the first available message that matches the search terms. | dequeue_options_t.navigation |
DBMS_AQ.NEXT_MESSAGE (1) | Returns the next available message that matches the search terms. | dequeue_options_t.navigation |
DBMS_AQ.NEXT_TRANSACTION (2) | This constant is defined. If this constant is used, an error is returned. | dequeue_options_t.navigation |
DBMS_AQ.FOREVER (0) | Keeps waiting if a message that matches the search term is not found. This parameter is the default value. | dequeue_options_t.wait |
DBMS_AQ.NO_WAIT (1) | Does not wait if a message that matches the search term is not found. | dequeue_options_t.wait |
DBMS_AQ.ON_COMMIT (0) | Dequeuing is part of the current transaction. | enqueue_options_t.visibility, dequeue_options_t.visibility |
DBMS_AQ.IMMEDIATE (1) | This constant is defined. If this constant is used, an error is returned. | enqueue_options_t.visibility, dequeue_options_t.visibility |
DBMS_AQ.PERSISTENT (0) | The message must be stored in a table. | enqueue_options_t.delivery_mode |
DBMS_AQ.BUFFERED (1) | This constant is defined. If this constant is used, an error is returned. | enqueue_options_t.delivery_mode |
DBMS_AQ.READY (0) | Specifies that the message is ready to be processed. | message_properties_t.state |
DBMS_AQ.WAITING (1) | Specifies that the message is waiting to be processed. | message_properties_t.state |
DBMS_AQ.PROCESSED (2) | Specifies that the message has been processed. | message_properties_t.state |
DBMS_AQ.EXPIRED (3) | Specifies that the message is in an exception queue. | message_properties_t.state |
DBMS_AQ.NO_DELAY (0) | This constant is defined. If this constant is used, an error is returned. | message_properties_t.delay |
DBMS_AQ.NEVER (NULL) | This constant is defined. If this constant is used, an error is returned. | message_properties_t.expiration |
DBMS_AQ.NAMESPACE_AQ (0) | Accepts notifications from DBMS_AQ queues. | sys.aq$_reg_info.namespace |
DBMS_AQ.NAMESPACE_ANONYMOUS (1) | This constant is defined. If this constant is used, an error is returned. | sys.aq$_reg_info.namespace |
ENQUEUE
You can use the stored procedure ENQUEUE
to add an entry to a queue. The following features are available:
ENQUEUE(
queue_name IN VARCHAR2,
enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
payload IN <type_name>,
msgid OUT RAW)
Parameters
queue_name
The name of an existing queue. This name may be a schema-qualified name. If you omit the schema name, the server uses the schema that is specified by SEARCH_PATH. Take note of the following item: Unquoted identifiers are converted to be lowercase before the identifiers are stored. This operation is not performed in Oracle. To include special characters or use a case-sensitive name, enclose the name in double quotation marks.
enqueue_options
enqueue_options is a value of the enqueue_options_t type.
DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD( visibility BINARY_INTEGER DEFAULT ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY INTEGER DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
The following table lists the only parameter values that are supported by enqueue_options_t.
visibility ON_COMMIT delivery_mode PERSISTENT sequence_deviation NULL transformation NULL relative_msgid NULL message_properties
message_properties
is a value of themessage_properties_t
type.message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog." C", attempts INTEGER, recipient_list"AQ$_RECIPIENT_LIST_T", exception_queue CHARACTER VARYING(61) COLLATE pg_catalog." C", enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog." C", delivery_mode INTEGER DBMS_AQ.PERSISTENT);
The following table describes the values that are supported by
message_properties_t
.Parameter Description priority If the queue table definition includes sort_list that references priority, this parameter affects the order in which messages are dequeued. A lower value specifies a higher dequeuing priority. delay The number of seconds elapsed before a message becomes available for dequeuing. Alternatively, you can set this parameter to NO_DELAY. expiration The number of seconds elapsed before a message expires. correlation The message associated with the entry. Default value: NULL. attempts The number of attempts to dequeue the message. This value is a value that is maintained by the system. recipient_list This parameter is not supported. exception_queue The name of an exception queue. If the message expires or is dequeued by a transaction that is rolled back for a number of times, the message is moved to this queue. enqueue_time The time when the entry was added to the queue. This value is provided by the system. state This parameter is maintained by DBMS_AQ. Valid values: - DBMS_AQ.READY: The delay has not been reached.
- DBMS_AQ.WAITING: The queue entry is ready for processing.
- DBMS_AQ.PROCESSED: The queue entry has been processed.
- DBMS_AQ.EXPIRED: The queue entry has been moved to the exception queue.
original_msgid This parameter is supported for compatibility, but this parameter is ignored. transaction_group This parameter is supported for compatibility, but this parameter is ignored. delivery_mode This parameter is not supported. Specify a value of DBMS_AQ.PERSISTENT. payload
You can use the payload parameter to provide the data that is associated with the queue entry. The payload type must match the type that is specified when you create the corresponding queue table. For more information, see DBMS_AQADM.CREATE_QUEUE_TABLE.
msgid
You can use the msgid parameter to retrieve a unique message identifier that is generated by the system.
Examples
The following anonymous block calls DBMS_AQ.ENQUEUE to add a message to a queue that is named work_order:
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
payload := work_order('Smith', 'system upgrade');
DBMS_AQ.ENQUEUE(
queue_name => 'work_order',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
END;
DEQUEUE
The stored procedure DEQUEUE
dequeues a message. The following features are available:
DEQUEUE(
queue_name IN VARCHAR2,
dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
payload OUT type_name,
msgid OUT RAW)
Parameters
queue_name
The name of an existing queue. This name may be a schema-qualified name. If you omit the schema name, the server uses the schema that is specified by SEARCH_PATH. Take note of the following item: Unquoted identifiers are converted to be lowercase before the identifiers are stored. This operation is not performed in Oracle. To include special characters or use a case-sensitive name, enclose the name in double quotation marks.
dequeue_options
dequeue _options
is a value of thedequeue_options_t
type.DEQUEUE_OPTIONS_T IS RECORD( consumer_name CHARACTER VARYING(30), dequeue_mode INTEGER, navigation INTEGER, visibility INTEGER, wait INTEGER, msgid BYTEA, correlation CHARACTER VARYING(128), deq_condition CHARACTER VARYING(4000), transformation CHARACTER VARYING(61), delivery_mode INTEGER);
The following table describes the only parameter values that are supported by dequeue_options_t.
Parameter Description consumer_name Must be NULL. dequeue_mode The locking behavior of the dequeuing operation. Valid values: - DBMS_AQ.BROWSE: reads a message without obtaining a lock.
- DBMS_AQ.LOCKED: reads a message after a lock is obtained.
- DBMS_AQ.REMOVE: reads a message before the message is deleted.
- DBMS_AQ.REMOVE_NODATA: reads a message but does not delete the message.
navigation Identifies the message to be retrieved. Valid values: - FIRST_MESSAGE: the first message in the queue that matches the search term.
- NEXT_MESSAGE: the next available message that matches the first term.
visibility Must be ON_COMMIT: If you roll back the current transaction, the dequeued item remains in the queue. wait Must be a number larger than 0, or be set to: - DBMS_AQ.FOREVER: waits indefinitely.
- DBMS_AQ.NO_WAIT: does not wait.
msgid The ID of the message to be dequeued. correlation This parameter is supported for compatibility, but this parameter is ignored. deq_condition A VARCHAR2 expression that calculates a BOOLEAN value and specifies whether a message must be dequeued. transformation This parameter is supported for compatibility, but this parameter is ignored. delivery_mode Must be PERSISTENT. Buffered messages are not supported in this mode. message_properties
message_properties
is a value of themessage_properties_t
type.message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog." C", attempts INTEGER, recipient_list"AQ$_RECIPIENT_LIST_T", exception_queue CHARACTER VARYING(61) COLLATE pg_catalog." C", enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog." C", delivery_mode INTEGER DBMS_AQ.PERSISTENT);
The following table describes the values that are supported by message_properties_t.
Parameter Description priority If the queue table definition includes sort_list that references priority, this parameter affects the order in which messages are dequeued. A lower value specifies a higher dequeuing priority. delay The number of seconds elapsed before a message becomes available for dequeuing. The NO_DELAY constant specifies that a message is immediately dequeued after the message is available. expiration The number of seconds elapsed before a message expires. correlation The message associated with the entry. Default value: NULL. attempts The number of attempts to dequeue the message. This value is a value that is maintained by the system. recipient_list This parameter is not supported. exception_queue The name of an exception queue. If the message expires or is dequeued by a transaction that is rolled back for a number of times, the message is moved to this queue. enqueue_time The time when the entry was added to the queue. This value is provided by the system. state This parameter is maintained by DBMS_AQ. Valid values: - DBMS_AQ.WAITING: The delay has not been reached.
- DBMS_AQ.READY: The queue entry is ready for processing.
- DBMS_AQ.PROCESSED: The queue entry has been processed.
- DBMS_AQ.EXPIRED: The queue entry has been moved to the exception queue.
original_msgid This parameter is supported for compatibility, but this parameter is ignored. transaction_group This parameter is supported for compatibility, but this parameter is ignored. delivery_mode This parameter is not supported. Specify a value of DBMS_AQ.PERSISTENT. payload
You can use the
payload
parameter to retrieve the payload of a message that is involved in a dequeuing operation. The payload type must match the type that is specified when you create the queue table.msgid
You can use the
msgid
parameter to retrieve a unique message identifier.
Examples
The following anonymous block calls DBMS_AQ.DEQUEUE
to retrieve a message from the queue and payload:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'work_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
DBMS_OUTPUT.PUT_LINE(
'The next work order is [' || payload.subject || '].'
);
END;
The payload is displayed by DBMS_OUTPUT.PUT_LINE
.
REGISTER
You can use the stored procedure REGISTER to register an email address, procedure, or URL that is used to receive notifications when an item is enqueued or dequeued. The following features are available:
REGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count IN NUMBER)
Parameters
reg_list
The reg_list parameter specifies a list of the AQ$_REG_INFO_LIST type. This list provides the information about each subscription that you want to register. The type of each entry in the list is AQ$_REG_INFO. The following table describes the included attributes.
Attribute Type Description name VARCHAR2 (128) The name of a subscription. This name may be a schema-qualified name. namespace NUMERIC The only supported value is DBMS_AQ.NAMESPACE_AQ (0). callback VARCHAR2 (4000) Describes the action to be performed upon notification. Only PL/SQL stored procedures are supported. The procedures are called in the following form: plsql://schema.procedure
. The following items describe the fields in this form:- The schema field specifies the schema in which the stored procedure resides.
- The procedure field specifies the name of the stored procedure to be notified.
context RAW (16) A user-defined value required by the callback procedure. count
The
count
parameter specifies the number of entries inreg_list
.
Examples
The following anonymous block calls DBMS_AQ.REGISTER to register stored procedures
that are notified when an item is added to or deleted from a queue. A set of attributes
of the sys.aq$_reg_info
type is provided for each subscription that is identified in the DECLARE section.
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker? PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history? PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts? PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.register(subscriptionlist, 3);
commit;
END;
/
The type of subscriptionlist
is sys.aq$_reg_info_list
and contains the sys.aq$_reg_info
objects that were described. The list name and the object count are passed to dbms_aq.register
.
UNREGISTER
You can use the stored procedure UNREGISTER
to disable notifications that are related to enqueuing and dequeuing. The following
features are available:
UNREGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count
IN NUMBER)
Parameters
reg_list
The
reg_list
parameter specifies a list of theAQ$_REG_INFO_LIST
type and provides the information about each subscription that you want to register. The type of each entry in the list isAQ$_REG_INFO
. The following table describes the attributes that can be included in each entry.Attribute Type Description name VARCHAR2 (128) The name of a subscription. This name may be a schema-qualified name. namespace NUMERIC The only supported value is DBMS_AQ.NAMESPACE_AQ (0). callback VARCHAR2 (4000) Describes the action to be performed upon notification. Only PL/SQL stored procedures are supported. The procedures are called in the following form: plsql://schema.procedure
. The following items describe the fields in this form:- The schema field specifies the schema in which the stored procedure resides.
- The procedure field specifies the name of the stored procedure to be notified.
context RAW (16) A user-defined value required by the stored procedure. count
The
count
parameter specifies the number of entries inreg_list
.
Examples
The following anonymous block calls DBMS_AQ.UNREGISTER
to disable the notifications that are specified in the example for DBMS_AQ.REGISTER
:
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker? PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history? PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts? PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.unregister(subscriptionlist, 3);
commit;
END;
/
The type of subscriptionlist
is sys.aq$_reg_info_list
and contains the sys.aq$_reg_info
objects that were described. The list name and the object count are passed to dbms_aq.unregister
.