views:

53

answers:

2

This question refers to the dequeueing of messages in Oracle Streams Advanced Queueing.

I need to ensure that the messages which are related to each other are processed sequentially.

For example, assume the queue is seeded with the four messages that have a business-related field called transaction reference (txn_ref) and two of the messages (1,3) belong to the same transaction (000001):

id | txn_ref | 
---+---------+
 1 | 000001  |
 2 | 000002  |
 3 | 000001  |
 4 | 000003  |

Assume also that I am running 4 threads/processes that wish to dequeue from this queue. The following should occur:

  1. thread 1 dequeues message #1
  2. thread 2 dequeues message #2
  3. thread 3 dequeues message #4 (because message #3 is related to #1 and #1 has not yet completed).
  4. thread 4 blocks waiting for a message
  5. thread 1 commits its work for message #1
  6. thread 4 (or perhaps thread 1) dequeues message #3.

My initial thought was that I could achieve this with a dequeue condition where the ENQ_TIME (enqueue time) is not later than any other ENQ_TIME of all the messages that have the same TXN_REF. But my problem is how to reference the TXN_REF of a message that I have not yet selected, in order to select it. e.g.

// Java API
String condition = "ENQ_TIME = (select min(ENQ_TIME) from AQ_TABLE1 where ??";
dequeueOption.setCondition(condition);

Is it possible to achieve what I want here?

+1  A: 

To answer your direct question, this can be achieved using the correlation field (called CORRID in the table), which is designed for this purpose.

So, on the enqueue, you'd use the AQMessageProperties.setCorrelation() method with the TXN_REF value as the parameter. Then, in your condition you would do something like this:

// Java API
String condition = "tab.ENQ_TIME = (select min(AQ_TABLE1.ENQ_TIME) from AQ_TABLE1 self where tab.CORRID=AQ_TABLE1.CORRID)";
dequeueOption.setCondition(condition);
Adam Hawkes
What I want to do is dequeue a message unless the correlation id matches a message that is currently being processed by a different worker. If I can identify the correlation IDs currently being processed then I can set the condition to be where corrid not in {a,b ... z}.
Synesso
Then maybe you need two queues: one for the messages themselves as above, and another to act like a "header" message (1 per TXN_REF). A thread would first dequeue the header to get the TXN_REF/CORRID, and then that thread would process ALL of the records with that CORRID.
Adam Hawkes
Interesting idea. The header message queue is like a token system. The enqueueing process should enqueue the header message *only* if it did not already exist on the queue. (Is that possible, I wonder). Then, because I'd like to process no more than 1 message per transaction, the dequeueing process could use the header to selectively dequeue a *single* message from the messages queue. When it had processed that message it would put the header message back on the queue if there was another related message to process. Complex, but with some promise.
Synesso
PS. I greatly appreciate your help. It surely deserves the bounty.
Synesso
+1  A: 

A strategy which you can try, if possible, is using Message Groups. The Oracle Documentation describes it briefly, but I found this Toad World article to be far more useful. Basically, you setup the queue table to treat all messages committed at the same time as one "group". When dequeueing, only one user at a time can dequeue from a "group" of messages.

Adam Hawkes
This is promising, but still not quite. My messages are logically related, but they are not enqueued in the same transaction. Can I still use message groups?
Synesso
I'm nearly positive that they have to be part of the same transaction.
Adam Hawkes