PolarDB-O provide message queueing and message processing. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Procedures in the DBMS_AQADM package can be used to create and manage message queues and queue tables. You can use the DBMS_AQ package to add messages to a queue or remove messages from a queue, or register or unregister a PL/SQL callback procedure.
PolarDB-O also provide extended non-compatible features for the DBMS_AQ package by running the following SQL commands:
- ALTER QUEUE
- ALTER QUEUE TABLE
- CREATE QUEUE
- CREATE QUEUE TABLE
- DROP QUEUE
- DROP QUEUE TABLE
The DBMS_AQADM package provides stored procedures that allow you to create and manage queues and queue tables.
Function/Procedure | Return type | Description |
---|---|---|
ALTER_QUEUE | N/A | Modifies an existing queue. |
ALTER_QUEUE_TABLE | N/A | Modifies an existing queue table. |
CREATE_QUEUE | N/A | Creates a queue. |
CREATE_QUEUE_TABLE | N/A | Creates a queue table. |
DROP_QUEUE | N/A | Drops an existing queue. |
DROP_QUEUE_TABLE | N/A | Drops an existing queue table. |
PURGE_QUEUE_TABLE | N/A | Removes one or more messages from a queue table. |
START_QUEUE | N/A | Makes a queue available for enqueuing and dequeuing procedures. |
STOP_QUEUE | N/A | Makes a queue unavailable for enqueuing and dequeuing procedures. |
The implementation of DBMS_AQADM in PolarDB-O is a partial implementation when compared with native Oracle. Only those functions and procedures listed in the preceding table are supported.
The following table lists the constants supported by PolarDB-O.
Constant | Description | Applicable parameter |
---|---|---|
DBMS_AQADM.TRANSACTIONAL(1) | This constant is defined. An error message is returned if this constant is used. | message_grouping |
DBMS_AQADM.NONE(0) | Specifies message grouping for a queue table. | message_grouping |
DBMS_AQADM.NORMAL_QUEUE(0) | Used with create_queue to specify queue_type. | queue_type |
DBMS_AQADM.EXCEPTION_QUEUE (1) | Used with create_queue to specify queue_type. | queue_type |
DBMS_AQADM.INFINITE(-1) | Used with create_queue to specify retention_time. | retention_time |
DBMS_AQADM.PERSISTENT (0) | The message must be stored in a table. | enqueue_options_t.delivery_mode |
DBMS_AQADM.BUFFERED (1) | This constant is defined. An error message is returned if this constant is used. | enqueue_options_t.delivery_mode |
DBMS_AQADM.PERSISTENT_OR_BUFFERED (2) | This constant is defined. An error message is returned if this constant is used. | enqueue_options_t.delivery_mode |
ALTER_QUEUE
You can use the ALTER_QUEUE procedure to modify an existing queue. The procedure has the following signature:
ALTER_QUEUE(
max_retries IN NUMBER DEFAULT NULL,
retry_delay IN NUMBER DEFAULT 0
retention_time IN NUMBER DEFAULT 0,
auto_commit IN BOOLEAN DEFAULT TRUE)
comment IN VARCHAR2 DEFAULT NULL,
Parameters
Parameter | Description |
---|---|
queue_name | The name of the new queue. |
max_retries | The maximum number of failed attempts allowed before a message is removed with the DEQUEUE statement. The value of max_retries is incremented with each ROLLBACK statement. When the number of failed attempts reaches the value specified by max_retries, the message is moved to the exception queue. A value of 0 means that no retries are allowed. |
retry_delay | The number of seconds elapsed between a rollback and message scheduling for re-processing. A value of 0 means that the message must be re-processed immediately. This is the default value. |
retention_time | The number of seconds elapsed between dequeuing and storage for a message. A value of 0 means that the message cannot be retained after being dequeued. A value of INFINITE means that a message is retained forever. Default value: 0. |
auto_commit | This parameter is ignored, but is included for compatibility. |
comment | A comment associated with a queue. |
Examples
The following command is used to alter a queue named work_order and set the retry_delay parameter to 5 seconds:
EXEC DBMS_AQADM.ALTER_QUEUE(queue_name => 'work_order', retry_delay => 5);
ALTER_QUEUE_TABLE
You can use the ALTER_QUEUE_TABLE procedure to modify an existing queue table. The procedure has the following signature:
ALTER_QUEUE_TABLE (
queue_table IN VARCHAR2,
comment IN VARCHAR2 DEFAULT NULL,
primary_instance IN BINARY_INTEGER DEFAULT 0,
secondary_instance IN BINARY_INTEGER DEFAULT 0,
Parameters
Parameter | Description |
---|---|
queue_table | The name of a queue table. This may be a schema-qualified name. |
comment | A comment about a queue table. |
primary_instance | This parameter is ignored, but is included for compatibility. |
secondary_instance | This parameter is ignored, but is included for compatibility. |
Examples
The following command is used to modify a queue table named work_order_table:
EXEC DBMS_AQADM.ALTER_QUEUE_TABLE
(queue_table => 'work_order_table', comment => 'This queue table contains work orders for the shipping department.') ;
The name of the queue table is work_order_table. The command is used to add a comment to the definition of the queue table.
CREATE_QUEUE
You can use the CREATE_QUEUE procedure to create a queue in an existing queue table. The procedure has the following signature:
CREATE_QUEUE(
queue_name IN VARCHAR2
queue_table IN VARCHAR2,
queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE,
max_retries IN NUMBER DEFAULT 5,
retry_delay IN NUMBER DEFAULT 0
retention_time IN NUMBER DEFAULT 0,
dependency_tracking IN BOOLEAN DEFAULT FALSE,
comment IN VARCHAR2 DEFAULT NULL,
auto_commit IN BOOLEAN DEFAULT TRUE)
Parameters
Parameter | Description |
---|---|
queue_name | The name of the new queue. |
queue_table | The name of the table where the new queue is located. |
queue_type | The type of the new queue. Valid values:
|
max_retries | The maximum number of failed attempts allowed before a message is removed with the DEQUEUE statement. The value of max_retries is incremented with each ROLLBACK statement. When the number of failed attempts reaches the value specified by max_retries, the message is moved to the exception queue. The default value for a system table is 0. The default value for a user-defined table is 5. |
retry_delay | The number of seconds elapsed between a rollback and message scheduling for re-processing. A value of 0 means that the message must be re-processed immediately. This is the default value. |
retention_time | The number of seconds elapsed between dequeuing and storage for a message. A value of 0 means that the message cannot be retained after being dequeued. A value of INFINITE means that a message is retained forever. Default value: 0. |
dependency_tracking | This parameter is ignored, but is included for compatibility. |
comment | A comment associated with a queue. |
auto_commit | This parameter is ignored, but is included for compatibility. |
Examples
The following anonymous block is used to create a queue named work_order in the work_order_table table:
BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'work_order', queue_table => 'work_order_table', comment => 'This queue contains pending work orders.') ;
END;
CREATE_QUEUE_TABLE
You can use the CREATE_QUEUE_TABLE procedure to create a queue table. The procedure has the following signature:
CREATE_QUEUE_TABLE (
queue_table IN VARCHAR2,
queue_payload_type IN VARCHAR2,
storage_clause IN VARCHAR2 DEFAULT NULL,
sort_list IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
message_grouping IN BINARY_INTEGER DEFAULT NONE,
comment IN VARCHAR2 DEFAULT NULL,
auto_commit IN BOOLEAN DEFAULT TRUE,
primary_instance IN BINARY_INTEGER DEFAULT 0,
secondary_instance IN BINARY_INTEGER DEFAULT 0,
compatible IN VARCHAR2 DEFAULT NULL,
secure IN BOOLEAN DEFAULT FALSE)
Parameters
Parameter | Description |
---|---|
queue_table | The name of a queue table. This may be a schema-qualified name. |
queue_payload_type | The user-defined type of the data to be stored in the queue table. To specify a RAW data type, you must create a user-defined type that identifies a RAW type. |
storage_clause | Specifies the attributes for the queue table. Only the TABLESPACE option is enforced.
All other options are ignored, but are included for compatibility. You can use the
TABLESPACE clause to specify the name of a tablespace in which a table is created.
|
sort_list | This parameter controls the dequeueing order of the queue and specifies the names
of the columns that are used to sort the queue in ascending order. The following combinations
of enq_time and priority are supported:
|
multiple_consumers | This parameter must be set to FALSE if required. |
message_grouping | This parameter must be set to NONE if required. |
comment | A comment about a queue table. |
auto_commit | This parameter is ignored, but is included for compatibility. |
primary_instance | This parameter is ignored, but is included for compatibility. |
secondary_instance | This parameter is ignored, but is included for compatibility. |
compatible | This parameter is ignored, but is included for compatibility. |
secure | This parameter is ignored, but is included for compatibility. |
Examples
The following anonymous block is used to create the work_order type with the attributes that hold the VARCHAR2 name and the TEXT project description. Then, the block uses this type to create a queue table.
BEGIN
CREATE TYPE work_order AS (name VARCHAR2, project TEXT, completed BOOLEAN);
EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
(queue_table => 'work_order_table',
queue_payload_type => 'work_order',
comment => 'Work order message queue table');
END;
The queue table is named work_order_table and contains a payload of the work_order type. A comment is added to indicate that this is the work order message queue table.
DROP_QUEUE
You can use the DROP_QUEUE procedure to drop a queue. The procedure has the following signature:
DROP_QUEUE(
queue_name IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE)
Parameters
Parameter | Description |
---|---|
queue_name | The name of the queue that you want to drop. |
auto_commit | This parameter is ignored, but is included for compatibility. |
Examples
The following anonymous block drops the queue named work_order:
BEGIN
DBMS_AQADM.DROP_QUEUE(queue_name => 'work_order');
END;
DROP_QUEUE_TABLE
You can use the DROP_QUEUE_TABLE procedure to drop a queue table. The procedure has the following signature:
DROP_QUEUE_TABLE(
queue_table IN VARCHAR2,
force IN BOOLEAN default FALSE,
auto_commit IN BOOLEAN default TRUE)
Parameters
Parameter | Description |
---|---|
queue_table | The name of a queue table. This may be a schema-qualified name. |
force | The force keyword specifies the behavior of the DROP_QUEUE_TABLE command when the
command is used to drop a table that contain entries:
|
auto_commit | This parameter is ignored, but is included for compatibility. |
Examples
The following anonymous block is used to drop a table named work_order_table:
BEGIN
DBMS_AQADM.DROP_QUEUE_TABLE ('work_order_table', force => TRUE);
END;
PURGE_QUEUE_TABLE
You can use the PURGE_QUEUE_TABLE procedure to delete messages from a queue table. The procedure has the following signature:
PURGE_QUEUE_TABLE(
queue_table IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options IN aq$_purge_options_t)
Parameters
Parameter | Description |
---|---|
queue_table | The name of the queue table from which you want to delete a message. |
purge_condition | Specifies as the condition that the server evaluates when the server determines the messages to be deleted. The condition is specified in a SQL WHERE clause. |
purge_options | An object of the aq$_purge_options_t type. An aq$_purge_options_t object contains certain attributes. For more information, see Table 1. |
Attribute | Type | Description |
---|---|---|
Block | Boolean | A value of TRUE means that an exclusive lock must be held on all queues within the table. Default value: FALSE. |
delivery_mode | INTEGER | Specifies the type of message to be deleted. The only supported value is dbms_aq.percent. |
Examples
The following anonymous block is used to remove any messages from work_order_table where the value of the column named completed is YES:
DECLARE
purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
dbms_aqadm.purge_queue_table('work_order_table', 'completed = YES', purge_options);
END;
START_QUEUE
You can use the START_QUEUE procedure to make a queue available for enqueuing and dequeuing. The procedure has the following signature:
START_QUEUE(
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE)
Parameters
Parameter | Description |
---|---|
queue_name | The name of the queue that you want to start. |
enqueue | A value of TRUE means that enqueuing is enabled. A value of FALSE means that the current setting is unchanged. Default value: TRUE. |
dequeue | A value of TRUE means that dequeuing is enabled. A value of FALSE means that the current setting is unchanged. Default value: TRUE. |
Examples
The following anonymous block is used to make a queue named work_order available for enqueuing:
BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'work_order);
END;
STOP_QUEUE
You can use the STOP_QUEUE procedure to disable enqueuing or dequeuing on a specified queue. The procedure has the following signature:
STOP_QUEUE(
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE,
wait IN BOOLEAN DEFAULT TRUE)
Parameters
Parameter | Description |
---|---|
queue_name | The name of the queue that you want to stop. |
enqueue | A value of TRUE means that enqueuing is disabled. A value of FALSE means that the current setting is unchanged. Default value: TRUE. |
dequeue | A value of TRUE means that dequeuing is disabled. A value of FALSE means that the current setting is unchanged. Default value: TRUE. |
wait | A value of TRUE means that the server waits for any uncompleted transactions to complete before the server applies the specified changes. When the server waits to stop the queue, no transactions are allowed to be enqueued to or dequeued from the specified queue. A value of FALSE means that the queue is stopped immediately. |
Examples
The following anonymous block is used disable enqueuing to and dequeuing from the queue named work_order:
BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name =>'work_order', enqueue=>TRUE, dequeue=>TRUE, wait=>TRUE);
END;
Enqueuing and dequeuing are stopped after all outstanding transactions are completed.