15. Using Oracle Advanced Queuing (AQ)

Oracle Advanced Queuing is a highly configurable and scalable messaging feature of Oracle Database. It has interfaces in various languages, letting you integrate multiple tools in your architecture.

Note

Oracle Advanced Queuing is only supported in the python-oracledb Thick mode. See Enabling python-oracledb Thick mode.

Python-oracledb uses the updated interface for Oracle Advanced Queuing that was first introduced in cx_Oracle 7.2.

There are Advanced Queuing examples in the GitHub examples directory.

15.1. Creating a Queue

Before being used, queues need to be created in the database, for example in SQL*Plus:

begin
    dbms_aqadm.create_queue_table('MY_QUEUE_TABLE', 'RAW');
    dbms_aqadm.create_queue('DEMO_RAW_QUEUE', 'MY_QUEUE_TABLE');
    dbms_aqadm.start_queue('DEMO_RAW_QUEUE');
end;
/

This examples creates a RAW queue suitable for sending string or raw bytes messages.

15.2. Enqueuing Messages

To send messages in Python, you connect and get a queue. The queue can be used for enqueuing, dequeuing, or both as needed.

queue = connection.queue("DEMO_RAW_QUEUE")

Now messages can be queued using enqone(). To send three messages:

PAYLOAD_DATA = [
    "The first message",
    "The second message",
    "The third message"
]
for data in PAYLOAD_DATA:
    queue.enqone(connection.msgproperties(payload=data))
connection.commit()

Since the queue sending the messages is a RAW queue, the strings in this example will be internally encoded to bytes using Connection.encoding before being enqueued.

15.3. Dequeuing Messages

Dequeuing is performed similarly. To dequeue a message call the method deqone() as shown. Note that if the message is expected to be a string, the bytes must be decoded using Connection.encoding.

queue = connection.queue("DEMO_RAW_QUEUE")
msg = queue.deqOne()
connection.commit()
print(msg.payload.decode(connection.encoding))

15.4. Using Object Queues

Named Oracle objects can be enqueued and dequeued as well. Given an object type called UDT_BOOK:

CREATE OR REPLACE TYPE udt_book AS OBJECT (
    Title   VARCHAR2(100),
    Authors VARCHAR2(100),
    Price   NUMBER(5,2)
);
/

And a queue that accepts this type:

begin
    dbms_aqadm.create_queue_table('BOOK_QUEUE_TAB', 'UDT_BOOK');
    dbms_aqadm.create_queue('DEMO_BOOK_QUEUE', 'BOOK_QUEUE_TAB');
    dbms_aqadm.start_queue('DEMO_BOOK_QUEUE');
end;
/

You can queue messages:

book_type = connection.gettype("UDT_BOOK")
queue = connection.queue("DEMO_BOOK_QUEUE", book_type)

book = book_type.newobject()
book.TITLE = "Quick Brown Fox"
book.AUTHORS = "The Dog"
book.PRICE = 123

queue.enqone(connection.msgproperties(payload=book))
connection.commit()

Dequeuing is done like this:

book_type = connection.gettype("UDT_BOOK")
queue = connection.queue("DEMO_BOOK_QUEUE", book_type)

msg = queue.deqone()
connection.commit()
print(msg.payload.TITLE)        # will print Quick Brown Fox

15.5. Using Recipient Lists

You can associate a list of recipient names with a message at the time of enqueuing the message. This feature limits the set of recipients that have to dequeue each message.

The recipient list associated with a message overrides the queue subscriber list, if it exists. It is not mandatory to include the recipient name in the subscriber list and if desired, it can be included.

To dequeue a message, the consumername attribute can be set to one of the recipient names. The original message recipient list is not available on dequeued messages. All recipients have to dequeue a message before it gets removed from the queue.

When subscribing to a queue, each subscriber can dequeue all the messages placed into a specific queue and each recipient is a designated target of a particular message.

For example:

props = self.connection.msgproperties(payload=book,recipients=["sub2", "sub3"])
queue.enqone(props)

Later, when dequeuing messages a specific recipient can be set to get messages intended for that recipient:

queue.deqoptions.consumername = "sub3"
m = queue.deqone()

15.6. Changing Queue and Message Options

Refer to the python-oracledb AQ API and Oracle Advanced Queuing documentation for details on all of the enqueue and dequeue options available.

Enqueue options can be set. For example, to make it so that an explicit call to commit() on the connection is not needed to commit messages:

queue = connection.queue("DEMO_RAW_QUEUE")
queue.enqoptions.visibility = oracledb.ENQ_IMMEDIATE

Dequeue options can also be set. For example, to specify not to block on dequeuing if no messages are available:

queue = connection.queue("DEMO_RAW_QUEUE")
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT

Message properties can be set when enqueuing. For example, to set an expiration of 60 seconds on a message:

queue.enqone(connection.msgproperties(payload="Message", expiration=60))

This means that if no dequeue operation occurs within 60 seconds that the message will be dropped from the queue.

15.7. Bulk Enqueue and Dequeue

The enqmany() and deqmany() methods can be used for efficient bulk message handling.

enqmany() is similar to enqone() but accepts an array of messages:

messages = [
    "The first message",
    "The second message",
    "The third message",
]
queue = connection.queue("DEMO_RAW_QUEUE")
queue.enqmany(connection.msgproperties(payload=m) for m in messages)
connection.commit()

Warning

Calling enqmany() in parallel on different connections acquired from the same pool may fail due to Oracle bug 29928074. Ensure that this function is not run in parallel, use standalone connections or connections from different pools, or make multiple calls to enqone() instead. The function deqmany() call is not affected.

To dequeue multiple messages at one time, use deqmany(). This takes an argument specifying the maximum number of messages to dequeue at one time:

for m in queue.deqmany(10):
    print(m.payload.decode(connection.encoding))

Depending on the queue properties and the number of messages available to dequeue, this code will print out from zero to ten messages.