array-based advanced queuing in 10g
Advanced Queuing (AQ) is Oracle's native queuing technology. AQ has been available for many versions of Oracle and more performance features have gradually been added over several releases. This short article introduces the following new functions added to the DBMS_AQ package in 10g.
- ENQUEUE_ARRAY
- DEQUEUE_ARRAY
As their names suggest, these APIs enable us to enqueue or dequeue batches of messages, rather than a single message at a time. As enqueues are INSERTs and dequeues are DELETEs (AQ uses tables for queuing), we can use these APIs to achieve some of the performance gains associated with bulk processing.
This article assumes that readers are familiar with AQ concepts. For an overview, read this oracle-developer.net article. This provides an introduction to queuing concepts, together with various setup requirements such as database objects, privileges etc.
setup
AQ users will be familiar with the concept of defining object types as payload structures. For array-based enqueue/dequeue, we will also require a collection type to enqueue and dequeue multiple payloads. In this article, we will create and use the following simple payload types.
SQL> CREATE TYPE demo_aq_array_ot AS OBJECT 2 ( message VARCHAR2(4000) ); 3 /
Type created.
SQL> CREATE TYPE demo_aq_array_ntt 2 AS TABLE OF demo_aq_array_ot; 3 /
Type created.
Once we have defined our payload structure, we can setup and start our queue as follows.
SQL> BEGIN 2 DBMS_AQADM.CREATE_QUEUE_TABLE ( 3 queue_table => 'demo_aq_array_qt', 4 queue_payload_type => 'demo_aq_array_ot' 5 ); 6 DBMS_AQADM.CREATE_QUEUE ( 7 queue_name => 'demo_aq_array_q', 8 queue_table => 'demo_aq_array_qt' 9 ); 10 DBMS_AQADM.START_QUEUE ( 11 queue_name => 'demo_aq_array_q' 12 ); 13 END; 14 /
PL/SQL procedure successfully completed.
dbms_aq.enqueue_array
We'll begin by using the new DBMS_AQ.ENQUEUE_ARRAY function to enqueue 2,500 messages as a single transaction. Oracle developers will be aware that work done within a single bulk transaction will be more efficient than a series of atomic operations. This is particularly true of INSERTs, which is essentially what enqueues are in AQ.
The parameters to ENQUEUE_ARRAY are very similar to those of the existing ENQUEUE procedure, except that we pass an array of messages and corresponding message properties. Our bespoke collection type defines the message array. We can set the properties for each message in the array individually using a new MESSAGE_PROPERTIES_ARRAY_T type. A single record of enqueue options is applied to all messages.
Given this, we can now see an example of enqueuing an array of 2,500 messages. We will time this by the wall-clock and compare it to 2,500 individual enqueues. Note that the ENQUEUE_ARRAY function returns the number of messages that were enqueued together with a collection of message IDs as an OUT parameter.
SQL> set timing on SQL> DECLARE 2 3 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 4 r_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T; 6 nt_payloads demo_aq_array_ntt; 7 nt_msg_ids DBMS_AQ.MSGID_ARRAY_T; 8 v_enqueued_cnt PLS_INTEGER; 9 10 BEGIN 11 12 /* Initialise the collections... */ 13 nt_payloads := demo_aq_array_ntt(); 14 nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T(); 15 nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T(); 16 17 /* Load up 2500 arbitrary messages... */ 18 FOR i IN 1 .. 2500 LOOP 19 20 nt_payloads.EXTEND; 21 nt_payloads(nt_payloads.LAST) := demo_aq_array_ot( 22 'Message number ' || TO_CHAR(i) 23 ); 24 25 nt_msg_properties.EXTEND; 26 nt_msg_properties(nt_msg_properties.LAST) := r_msg_properties; 27 28 END LOOP; 29 30 /* Enqueue the messages in a single transaction... */ 31 v_enqueued_cnt := DBMS_AQ.ENQUEUE_ARRAY( 32 queue_name => 'demo_aq_array_q', 33 enqueue_options => r_enqueue_options, 34 array_size => nt_payloads.COUNT, 35 message_properties_array => nt_msg_properties, 36 payload_array => nt_payloads, 37 msgid_array => nt_msg_ids 38 ); 39 COMMIT; 40 41 DBMS_OUTPUT.PUT_LINE( 42 'Enqueued [' || TO_CHAR(v_enqueued_cnt) || '] messages.' 43 ); 44 45 DBMS_OUTPUT.PUT_LINE( 46 'Received [' || TO_CHAR(nt_msg_ids.COUNT) || '] message IDs.' 47 ); 48 49 END; 50 /
Enqueued [2500] messages. Received [2500] message IDs. PL/SQL procedure successfully completed. Elapsed: 00:00:00.70
Before we compare this with 2,500 single message enqueues, we can confirm that our message array was enqueued as follows.
SQL> SELECT COUNT(*) FROM aq$demo_aq_array_qt;
COUNT(*) ---------- 2500 1 row selected.
We can now run two comparisons. First we will enqueue 2,500 single messages but commit once after all messages have been enqueued. This emulates the transaction behaviour of the array method.
SQL> set timing on SQL> DECLARE 2 3 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 4 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 v_message_handle RAW(16); 6 7 BEGIN 8 9 FOR i IN 1 .. 2500 LOOP 10 11 DBMS_AQ.ENQUEUE( 12 queue_name => 'demo_aq_array_q', 13 enqueue_options => r_enqueue_options, 14 message_properties => r_message_properties, 15 payload => demo_aq_array_ot('Message number ' || TO_CHAR(i)), 16 msgid => v_message_handle 17 ); 18 19 END LOOP; 20 COMMIT; 21 22 END; 23 /
PL/SQL procedure successfully completed. Elapsed: 00:00:01.21
By the wall-clock alone, this takes approximately 70% longer than the new array enqueue (note also that the array example timing includes the population of the initial message collection). Of course, we are dealing with very short elapsed times in these examples, but higher-volume tests produce similar comparative results. For a second test, we can enqueue 2,500 single messages but commit each message as it is queued. This emulates the standard queuing behaviour of most AQ applications.
SQL> set timing on SQL> DECLARE 2 3 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 4 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 v_message_handle RAW(16); 6 7 BEGIN 8 9 FOR i IN 1 .. 2500 LOOP 10 11 DBMS_AQ.ENQUEUE( 12 queue_name => 'demo_aq_array_q', 13 enqueue_options => r_enqueue_options, 14 message_properties => r_message_properties, 15 payload => demo_aq_array_ot('Message number ' || TO_CHAR(i)), 16 msgid => v_message_handle 17 ); 18 19 COMMIT; 20 21 END LOOP; 22 23 END; 24 /
PL/SQL procedure successfully completed. Elapsed: 00:00:01.73
As is probably to be expected, this is considerably "slower" (over twice the elapsed time of the array enqueue example). Commits can be expensive in highly transactional systems and 2,500 singleton inserts generate much more redo than the same number of rows being inserted in bulk. We can see the effect of the additional resources in these timings. As an exercise for the reader, compare the resource usage of the three methods using Tom Kyte's RUNSTATS utility (a version of which can be found on the Utilities page of this site).
a note on options
Before we continue, it is worth noting that there is an enormous range of options with the new array-based functions, in addition to the existing enqueue-dequeue options, consumer options and notifications/listeners etc. For example, we can enqueue messages to a named transaction group which can be dequeued in isolation to increase concurrency. Alternatively, we can dequeue across groups. We can specify a boolean search condition to dictate whether to dequeue a message and also control the part of the queue that Oracle starts dequeuing from.
This introductory article uses default values for all examples, rather than attempt to demonstrate every possible combination of options.
dbms_aq.dequeue_array
Having enqueued 7,500 messages in the above examples, we can now dequeue them in batches. We will simply dequeue all messages in batches of up to 500 and write them to a message table. When there are no messages left, we will exit. We can determine whether there are any messages left to dequeue and process in one of two ways:
- scenario one: if the number of messages dequeued is fewer than the requested array size, then this is the last batch to collect; or
- scenario two: if the final fetch is equivalent to the batch size, then there will be 0 messages left to dequeue. Of course, we cannot possibly know that we fetched the last batch under this scenario, so we can add a timeout to avoid waiting indefinitely (we would have to wait until the number of new enqueued messages reached our array size). Remember from our enqueue examples that we enqueued an exact multiple of our dequeue batch size so we will encounter this scenario.
The following example includes the code required for both scenarios (for information), but it is scenario two that we specifically need to cater for. As stated, the dequeued messages will be written to a message table, which is created as follows.
SQL> CREATE TABLE demo_aq_array_mt 2 ( message VARCHAR2(4000) 3 , ts TIMESTAMP(3) 4 );
Table created.
Now we can dequeue the messages in batches of 500.
SQL> DECLARE 2 3 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; 4 nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T; 5 nt_payloads demo_aq_array_ntt; 6 nt_msg_ids DBMS_AQ.MSGID_ARRAY_T; 7 v_dequeued_cnt PLS_INTEGER; 8 v_dequeue_batch PLS_INTEGER := 500; 9 v_continue BOOLEAN := TRUE; 10 11 x_timeout EXCEPTION; 12 PRAGMA EXCEPTION_INIT(x_timeout, -25228); 13 14 BEGIN 15 16 /* Prepare collections... */ 17 nt_payloads := demo_aq_array_ntt(); 18 nt_payloads.EXTEND(v_dequeue_batch); 19 nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T(); 20 nt_msg_properties.EXTEND(v_dequeue_batch); 21 nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T(); 22 23 /* Set a 5 second timeout for the dequeue... */ 24 r_dequeue_options.wait := 5; 25 26 /* Dequeue all messages in batches... */ 27 WHILE v_continue LOOP 28 29 BEGIN 30 31 v_dequeued_cnt := DBMS_AQ.DEQUEUE_ARRAY( 32 queue_name => 'demo_aq_array_q', 33 dequeue_options => r_dequeue_options, 34 array_size => v_dequeue_batch, 35 message_properties_array => nt_msg_properties, 36 payload_array => nt_payloads, 37 msgid_array => nt_msg_ids 38 ); 39 40 DBMS_OUTPUT.PUT_LINE( 41 'Dequeued [' || TO_CHAR(v_dequeued_cnt) || '] messages.' 42 ); 43 44 INSERT INTO demo_aq_array_mt 45 SELECT nt.message, SYSTIMESTAMP 46 FROM TABLE(nt_payloads) nt; 47 48 DBMS_OUTPUT.PUT_LINE( 49 'Inserted [' || TO_CHAR(SQL%ROWCOUNT) || '] messages.' 50 ); 51 52 /* Handle exit scenario one from the notes above... */ 53 v_continue := (v_dequeued_cnt = v_dequeue_batch); 54 55 /* Commit the batch... */ 56 COMMIT; 57 58 EXCEPTION 59 60 /* Handle exit scenario two from the notes above... */ 61 WHEN x_timeout THEN 62 DBMS_OUTPUT.PUT_LINE( 63 'No more messages to dequeue.' 64 ); 65 v_continue := FALSE; 66 67 END; 68 69 END LOOP; 70 71 END; 72 /
Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. Dequeued [500] messages. Inserted [500] messages. No more messages to dequeue. PL/SQL procedure successfully completed.
We have now dequeued our example messages and written them to the message table. Out of interest, we can verify the number of messages dequeued and see the elapsed time as follows.
SQL> SELECT COUNT(*) 2 , MIN(ts) AS first_batch_written 3 , MAX(ts) AS last_batch_written 4 FROM demo_aq_array_mt;
COUNT(*) FIRST_BATCH_WRITTEN LAST_BATCH_WRITTEN ---------- -------------------------- -------------------------- 7500 29-JUL-05 21.28.01.031 29-JUL-05 21.28.05.671 1 row selected.
This is a simple confirmation that we collected all of our messages and the entire operation took approximately 5 seconds.
The dequeue example was designed to demonstrate the new array feature. Note, however, that dequeuing in batches would possibly be better combined with notification (via callback procedure), rather than managed by a standalone unit. See this introduction to AQ for an example of a callback procedure that automatically dequeues messages via notification. As a strategy note, a callback procedure that dequeues every message from a queue might hit redundant notifications. This is because each enqueue operation will trigger a notification but the first execution of the callback procedure will potentially dequeue everything. This exception (ORA-01002) will need to be handled.
dequeue timings
As with the enqueue examples, we can compare the array method against the wall-clock. We will enqueue 7,500 messages but this time, dequeue them using the three methods we adopted for the enqueue example:
- array dequeue;
- single-row dequeue with one commit; and
- single-row dequeue with a commit on every record.
First we need to enqueue 7,500 messages.
SQL> DECLARE 2 3 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 4 r_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T; 6 nt_payloads demo_aq_array_ntt; 7 nt_msg_ids DBMS_AQ.MSGID_ARRAY_T; 8 v_enqueued_cnt PLS_INTEGER; 9 10 BEGIN 11 12 /* Initialise the collections... */ 13 nt_payloads := demo_aq_array_ntt(); 14 nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T(); 15 nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T(); 16 17 /* Load up 7500 arbitrary messages... */ 18 FOR i IN 1 .. 7500 LOOP 19 20 nt_payloads.EXTEND; 21 nt_payloads(nt_payloads.LAST) := demo_aq_array_ot( 22 'Message number ' || TO_CHAR(i) 23 ); 24 25 nt_msg_properties.EXTEND; 26 nt_msg_properties(nt_msg_properties.LAST) := r_msg_properties; 27 28 END LOOP; 29 30 /* Enqueue the messages in a single transaction... */ 31 v_enqueued_cnt := DBMS_AQ.ENQUEUE_ARRAY( 32 queue_name => 'demo_aq_array_q', 33 enqueue_options => r_enqueue_options, 34 array_size => nt_payloads.COUNT, 35 message_properties_array => nt_msg_properties, 36 payload_array => nt_payloads, 37 msgid_array => nt_msg_ids 38 ); 39 COMMIT; 40 41 DBMS_OUTPUT.PUT_LINE( 42 'Enqueued [' || TO_CHAR(v_enqueued_cnt) || '] messages.' 43 ); 44 45 END; 46 /
Enqueued [7500] messages. PL/SQL procedure successfully completed.
Now we can dequeue 2,500 messages in a single array. Note that we do not do anything with these messages as this is a wall-clock timing of the dequeue operation itself.
SQL> set timing on SQL> DECLARE 2 3 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; 4 nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T; 5 nt_payloads demo_aq_array_ntt; 6 nt_msg_ids DBMS_AQ.MSGID_ARRAY_T; 7 v_dequeued_cnt PLS_INTEGER; 8 v_dequeue_batch PLS_INTEGER := 2500; 9 10 BEGIN 11 12 /* Prepare collections... */ 13 nt_payloads := demo_aq_array_ntt(); 14 nt_payloads.EXTEND(v_dequeue_batch); 15 nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T(); 16 nt_msg_properties.EXTEND(v_dequeue_batch); 17 nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T(); 18 19 /* Dequeue 2,500 messages... */ 20 v_dequeued_cnt := DBMS_AQ.DEQUEUE_ARRAY( 21 queue_name => 'demo_aq_array_q', 22 dequeue_options => r_dequeue_options, 23 array_size => v_dequeue_batch, 24 message_properties_array => nt_msg_properties, 25 payload_array => nt_payloads, 26 msgid_array => nt_msg_ids 27 ); 28 29 DBMS_OUTPUT.PUT_LINE( 30 'Dequeued [' || TO_CHAR(v_dequeued_cnt) || '] messages.' 31 ); 32 33 COMMIT; 34 35 END; 36 /
Dequeued [2500] messages. PL/SQL procedure successfully completed. Elapsed: 00:00:00.89
Now we have our baseline timing, we can dequeue a further 2,500 messages but using the single-message DEQUEUE procedure. We'll only commit when all messages have been dequeued.
SQL> set timing on SQL> DECLARE 2 3 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; 4 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 v_message_handle RAW(16); 6 o_payload demo_aq_array_ot; 7 8 BEGIN 9 10 FOR i IN 1 .. 2500 LOOP 11 12 DBMS_AQ.DEQUEUE( 13 queue_name => 'demo_aq_array_q', 14 dequeue_options => r_dequeue_options, 15 message_properties => r_message_properties, 16 payload => o_payload, 17 msgid => v_message_handle 18 ); 19 20 END LOOP; 21 22 COMMIT; 23 24 END; 25 /
PL/SQL procedure successfully completed. Elapsed: 00:00:01.21
This is approximately 50% slower than the array-based dequeue. Finally, we can dequeue the remaining messages but commit each time we collect a message.
SQL> set timing on SQL> DECLARE 2 3 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; 4 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 5 v_message_handle RAW(16); 6 o_payload demo_aq_array_ot; 7 8 BEGIN 9 10 FOR i IN 1 .. 2500 LOOP 11 12 DBMS_AQ.DEQUEUE( 13 queue_name => 'demo_aq_array_q', 14 dequeue_options => r_dequeue_options, 15 message_properties => r_message_properties, 16 payload => o_payload, 17 msgid => v_message_handle 18 ); 19 20 COMMIT; 21 22 END LOOP; 23 24 END; 25 /
PL/SQL procedure successfully completed. Elapsed: 00:00:01.62
This method takes approximately twice as long as the array dequeue method. The overall time comparisons are similar to those of the enqueue examples above.
array does not imply bulk
Our understanding of array-based processing is that it is more efficient; in fact, we can see improved elapsed times in our enqueue and dequeue examples. However, if we run our ENQUEUE_ARRAY and DEQUEUE_ARRAY examples with SQL trace enabled, we see something surprising. The following TKProf output shows the ENQUEUE_ARRAY example run at level 12 SQL trace (event 10046).
insert into "SCOTT"."DEMO_AQ_ARRAY_QT" (q_name, msgid, corrid, priority, state, delay, expiration, time_manager_info, local_order_no, chain_no, enq_time, step_no, enq_uid, enq_tid, retry_count, exception_qschema, exception_queue, recipient_key, dequeue_msgid, user_data, sender_name, sender_address, sender_protocol, user_prop, cscn, dscn) values (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12, :13, :14, 0, :15, :16, :17, :18, :19, :20, :21, :22, :23, :24, :25) returning rowid into :rd call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 1 0.00 0.00 0 0 0 0 Execute 1 0.62 0.74 13 84 13371 2500 Fetch 0 0.00 0.00 0 0 0 0 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 2 0.62 0.74 13 84 13371 2500 Misses in library cache during parse: 1 Misses in library cache during execute: 1 Optimizer mode: CHOOSE Parsing user id: SYS (recursive depth: 1)
We can see that our entire message array was inserted into the queue table in a single insert, which is what we expect. However, if we now look at the dequeue trace, we might be surprised.
delete from "SCOTT"."DEMO_AQ_ARRAY_QT" where rowid = :1 call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 0 0.00 0.00 0 0 0 0 Execute 2500 0.32 0.27 0 2501 13093 2500 Fetch 0 0.00 0.00 0 0 0 0 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 2500 0.32 0.27 0 2501 13093 2500 Misses in library cache during parse: 0 Optimizer mode: CHOOSE Parsing user id: SYS (recursive depth: 1)
This time there is no bulk operation at all. Each message in our array was dequeued individually, as proven by the 2,500 executes of this delete statement. Furthermore, the following select statement precedes the delete, which tells us that Oracle is fetching the data off the queue (i.e. the queue table) first so it can lock it for delete. It fetches the data in bulk, but it is not using subsequent array-based deletes.
select /*+ INDEX(TAB) */ tab.rowid, tab.msgid, tab.corrid, tab.priority, tab.delay, tab.expiration, tab.retry_count, tab.exception_qschema, tab.exception_queue, tab.chain_no, tab.local_order_no, tab.enq_time, tab.time_manager_info, tab.state, tab.enq_tid, tab.step_no, tab.sender_name, tab.sender_address, tab.sender_protocol, tab.dequeue_msgid, tab.user_prop, tab.cscn, tab.dscn, tab.user_data from "SCOTT"."DEMO_AQ_ARRAY_QT" tab where q_name = :1 and state = :2 order by q_name, state, enq_time, step_no, chain_no, local_order_no for update skip locked call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 0 0.00 0.00 0 0 0 0 Execute 4 0.00 0.00 0 0 0 0 Fetch 2504 0.23 0.20 0 5187 2510 2500 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 2508 0.23 0.20 0 5187 2510 2500 Misses in library cache during parse: 0 Optimizer mode: CHOOSE Parsing user id: SYS (recursive depth: 1)
This is one to look out for in future releases.
further reading
For general information on Advanced Queuing, refer to the Advanced Queuing Guide. For information on the new ENQUEUE_ARRAY and DEQUEUE_ARRAY functions described in this article, read the DBMS_AQ reference.
acknowledgements
Credit goes to Job Miller for pointing out the underlying behaviour of the DEQUEUE_ARRAY function.
source code
The source code for the examples in this article can be downloaded from here.
Adrian Billington, July 2005
Back to Top