views:

58

answers:

2

Is there an implementation with a blocking queue for take but bounded by a maximum size. When the size of the queue reaches a given max-size, instead of blocking 'put', it will remove the head element and insert it. So put is not blocked() but take() is.

One usage is that if I have a very slow consumer, the system will not crash ( runs out of memory ) rather these message will be removed but I do not want to block the producer.

An example of this would stock trading system. When you get a spike in stock trade/quote data, if you haven't consumed data, you want to automatically throw away old stock trade/quote.

+1  A: 

Several strategies are provided in ThreadPoolExecutor. Search for "AbortPolicy" in this javadoc . You can also implement your own policy if you want. Perhaps Discard is similar to what you want. Personally I think CallerRuns is what you want in most cases.

I think using these is a better solution, but if you absolutely want to implement it at the queue, I'd probably do it by composition. Perhaps use a LinkedList or something and wrap it with synchronize keyword.

EDIT:(some clarifications..) "Executor" is basically a thread pool combined with a blocking queue. It is the recommended way to implement a producer/consumer pattern in java. The authors of these libraries provides several strategies to cope with issues like you mentioned. If you are interested, here is another approach to specifically address the OOME issue (the source is framework specific and can't be used as is).

Enno Shioji
AbortPolicy is used when ThreadPool is being shutdown so it needs to drain queued tasks. What I want is a max-size queue that will pop the first message as you add new task.Also, netty sample you attached is 'blocked' put. The implementation I was looking for is not-blocked but rather pop the head of queue.
mjlee
@mjlee: Well, the suggested search was just so that you can find the section easier. I'm not suggesting to use that policy. I thought I'll provide a more generic answer. If you want what you specifically want, I recommend implementing a custom policy. You can use a Deque for the queue implementation so that you can pop from the head. Personally I find it rather weird to pop from the head though. Why not simply discard? There is no fundamental difference, and it's easier and simpler. And it's already implemented by somebody else.
Enno Shioji
btw, no, AbortPolicy is not used to drain tasks. It's a strategy to give up when the producers are too fast!
Enno Shioji
@Zwei - Thank you pointing about AbortPolicy - it is rejection policy and only time I ever account was when Executor was shutting down or shtudown. Yes. It is not for draining. Usually the backend is 'Linked' queue so I never realized it is also used for max-capacity.However, I want the eviction of the earlier task. This is very useful in when you system suppose to react to pseudo real-time.
mjlee
+1  A: 

There currently isnt in Java a thread-safe queue that will do what you are looking for. However, there is a BlockingDequeue (Double Ended Queue) that you can write a wrapper in which you can take from the head and and tail as you see freely.

This class, similar to a BlockingQueue, is thread safe.

John V.