views:

219

answers:

2

I have a pool of threads which are fed tasks from a queue. Generally, a small number of threads are able to keep the queue empty. Occasionally, a particularly large burst of events will keep the queue size above zero for some time, but not for long.

My concern is regarding events that are duplicates or carry data that obsoletes previous events. During times of high volume, such events can coexists in the queue for a short period of time. I would like to be able to conflate these so that I can reduce time spent doing wasted work.

What is a good way to conflate such a queue? I could conflate at insertion time by iterating from back to head and searching for a candidate to replace, however this seems too brute-force. If you have code or library recommendations, please keep in mind that I am using Java.

+2  A: 

Why not just implement hashCode() and equals() based on your tasks. Then just remove the task. For example.

queue.remove(task);
queue.offer(task);

Then you will not have duplicates. Or alternatively.

if(!queue.contains(task)) {
   queue.offer(task);
}

This will avoid enqueuing the task if its already in the queue.

ng
The remove() function will start searching from the head of the queue. If it finds a match, it will remove it and I will add the new item at the end, but I need to add the item at the same location. Remove() will also create an ordering problem if there are multiple duplicates on the queue.
omerkudat
Then don't remove, just check before you add. Notice the two options I provided.
ng
Whatever I do it modifies the ordering of the queue anyway. I've opted for this solution, but instead of checking contains, I do: isEmpty() ? offer() : remove(), offer(); Thanks.
omerkudat
A: 

If you use LinkedHashMap, you can preserve the ordering in which the entries were added to the queue.

When a matching entry arrives, I gather that you want to append some of its data to the original queue entry. In this case, you can either update the hashed object in place, or use HashMap.put(key, value) to replace the queued item with a new object. (I think that this preserves the order of the original item, but I have not tested this.)

Note that your code will need to explicitly synchronize read and write access to the LinkedHashMap and the data inside it. You don't want to be updating an item in the queue at the same time that another thread is grabbing it for processing. The easiest way to synchronize is probably by accessing the LinkedHashMap through Collections.synchronizedMap().

Dan Breslau