views:

2160

answers:

9

UPDATE: Here's my implementation of Hashed Timing Wheels. Please let me know if you have an idea to improve the performance and concurrency. (20-Jan-2009)

// Sample usage:
public static void main(String[] args) throws Exception {
    Timer timer = new HashedWheelTimer();
    for (int i = 0; i < 100000; i ++) {
        timer.newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                // Extend another second.
                timeout.extend();
            }
        }, 1000, TimeUnit.MILLISECONDS);
    }
}

UPDATE: I solved this problem by using Hierarchical and Hashed Timing Wheels. (19-Jan-2009)

I'm trying to implement a special purpose timer in Java which is optimized for timeout handling. For example, a user can register a task with a dead line and the timer could notify a user's callback method when the dead line is over. In most cases, a registered task will be done within a very short amount of time, so most tasks will be canceled (e.g. task.cancel()) or rescheduled to the future (e.g. task.rescheduleToLater(1, TimeUnit.SECOND)).

I want to use this timer to detect an idle socket connection (e.g. close the connection when no message is received in 10 seconds) and write timeout (e.g. raise an exception when the write operation is not finished in 30 seconds.) In most cases, the timeout will not occur, client will send a message and the response will be sent unless there's a weird network issue..

I can't use java.util.Timer or java.util.concurrent.ScheduledThreadPoolExecutor because they assume most tasks are supposed to be timed out. If a task is cancelled, the cancelled task is stored in its internal heap until ScheduledThreadPoolExecutor.purge() is called, and it's a very expensive operation. (O(NlogN) perhaps?)

In traditional heaps or priority queues I've learned in my CS classes, updating the priority of an element was an expensive operation (O(logN) in many cases because it can only be achieved by removing the element and re-inserting it with a new priority value. Some heaps like Fibonacci heap has O(1) time of decreaseKey() and min() operation, but what I need at least is fast increaseKey() and min() (or decreaseKey() and max()).

Do you know any data structure which is highly optimized for this particular use case? One strategy I'm thinking of is just storing all tasks in a hash table and iterating all tasks every second or so, but it's not that beautiful.

A: 

You've got a hard-limit on the number of items in the queue - there is a limit to TCP sockets.

Therefore the problem is bounded. I suspect any clever data structure will be slower than using built-in types.

Douglas Leeder
A client application does have limitation of 64k connections due to the number of ports available, but a server side application can handle more than that as long as it has enough CPU power. And even one connection can send 10k msgs/sec setting a timeout for each.
Trustin Lee
A: 

Is there a good reason not to use java.lang.PriorityQueue? Doesn't remove() handle your cancel operations in log(N) time? Then implement your own waiting based on the time until the item on the front of the queue.

Nick Fortescue
log(N) is not fast enough. Consider every connection tries to send messages as fast as possible setting timeout for each.
Trustin Lee
java.util.PriorityQueue#remove(Object) is O(N), not O(logN)!
Dimitris Andreou
A: 

I think storing all the tasks in a list and iterating through them would be best.

You must be (going to) run the server on some pretty beefy machine to get to the limits where this cost will be important?

Douglas Leeder
I am actually writing a generic network application framework[link text][1], so it needs to run fine both on commodity hardware and on a beefy machine. [1]: http://www.jboss.org/netty/
Trustin Lee
+7  A: 

How about trying to separate the handing of the normal case where things complete quickly from the error cases?

Use both a hash table and a priority queue. When a task is started it gets put in the hash table and if it finishes quickly it gets removed in O(1) time.

Every one second you scan the hash table and any tasks that have been a long time, say .75 seconds, get moved to the priority queue. The priority queue should always be small and easy to handle. This assumes that one second is much less than the timeout times you are looking for.

If scanning the hash table is too slow, you could use two hash tables, essentially one for even-numbered seconds and one for odd-numbered seconds. When a task gets started it is put in the current hash table. Every second move all the tasks from the non-current hash table into the priority queue and swap the hash tables so that the current hash table is now empty and the non-current table contains the tasks started between one and two seconds ago.

There options are a lot more complicated than just using a priority queue, but are pretty easily implemented should be stable.

David Norman
This makes a lot of sense in that it covers most cases with hash tables so that the update / cancellation time per message is O(1). If the normal response time range spans from 0 to 60 seconds, I could create more hash tables. Thanks!
Trustin Lee
+3  A: 

Some combination of hashes and O(logN) structures should do what you ask.

I'm tempted to quibble with the way you're analyzing the problem. In your comment above, you say

Because the update will occur very very frequently. Let's say we are sending M messages per connection then the overall time becomes O(MNlogN), which is pretty big. – Trustin Lee (6 hours ago)

which is absolutely correct as far as it goes. But most people I know would concentrate on the cost per message, on the theory that as you app has more and more work to do, obviously it's going to require more resources.

So if your application has a billion sockets open simultaneously (is that really likely?) the insertion cost is only about 60 comparisons per message.

I'll bet money that this is premature optimization: you haven't actually measured the bottlenecks in you system with a performance analysis tool like CodeAnalyst or VTune.

Anyway, there's probably an infinite number of ways of doing what you ask, once you just decide that no single structure will do what you want, and you want some combination of the strengths and weaknesses of different algorithms.

One possiblity is to divide the socket domain N into some number of buckets of size B, and then hash each socket into one of those (N/B) buckets. In that bucket is a heap (or whatever) with O(log B) update time. If an upper bound on N isn't fixed in advance, but can vary, then you can create more buckets dynamically, which adds a little complication, but is certainly doable.

In the worst case, the watchdog timer has to search (N/B) queues for expirations, but I assume the watchdog timer is not required to kill idle sockets in any particular order! That is, if 10 sockets went idle in the last time slice, it doesn't have to search that domain for the one that time-out first, deal with it, then find the one that timed-out second, etc. It just has to scan the (N/B) set of buckets and enumerate all time-outs.

If you're not satisfied with a linear array of buckets, you can use a priority queue of queues, but you want to avoid updating that queue on every message, or else you're back where you started. Instead, define some time that's less than the actual time-out. (Say, 3/4 or 7/8 of that) and you only put the low-level queue into the high-level queue if it's longest time exceeds that.

And at the risk of stating the obvious, you don't want your queues keyed on elapsed time. The keys should be start time. For each record in the queues, elapsed time would have to be updated constantly, but the start time of each record doesn't change.

Die in Sente
Yes, you are correct that it could be explained much better from per-message perspective. My bad! However, I don't think it's a premature optimization because I already have experience with a traditional heap - it taxes CPU really much when message throughput is very high.
Trustin Lee
Anyways, I also link your solution along with David's but it's too bad that I can't choose two answers. Thanks for your insight!
Trustin Lee
I also link ... -> I also think ...
Trustin Lee
I voted David's answer up myself. :-)
Die in Sente
+1  A: 

Your specific scenario suggests a circular buffer to me. If the max. timeout is 30 seconds and we want to reap sockets at least every tenth of a second, then use a buffer of 300 doubly-linked lists, one for each tenth of a second in that period. To 'increaseTime' on an entry, remove it from the list it's in and add it to the one for its new tenth-second period (both constant-time operations). When a period ends, reap anything left over in the current list (maybe by feeding it to a reaper thread) and advance the current-list pointer.

Darius Bacon
A: 

If performance is really that important, why are you using Java?

Die in Sente
+1  A: 

Use Hashed Timing Wheel - Google 'Hashed Hierarchical Timing Wheels' for more information. It's a generalization of the answers made by people here. I'd prefer a hashed timing wheel with a large wheel size to hierarchical timing wheels.

Trustin Lee
+1  A: 

To the best of my knowledge (I wrote a paper about a new priority queue, which also reviewed past results), no priority queue implementation gets the bounds of Fibonacci heaps, as well as constant-time increase-key.

There is a small problem with getting that literally. If you could get increase-key in O(1), then you could get delete in O(1) -- just increase the key to +infinity (you can handle the queue being full of lots of +infinitys using some standard amortization tricks). But if find-min is also O(1), that means delete-min = find-min + delete becomes O(1). That's impossible in a comparison-based priority queue because the sorting bound implies (insert everything, then remove one-by-one) that

n * insert + n * delete-min > n log n.

The point here is that if you want a priority-queue to support increase-key in O(1), then you must accept one of the following penalties:

  • Not be comparison based. Actually, this is a pretty good way to get around things, e.g. vEB trees.
  • Accept O(log n) for inserts and also O(n log n) for make-heap (given n starting values). This sucks.
  • Accept O(log n) for find-min. This is entirely acceptable if you never actually do find-min (without an accompanying delete).

But, again, to the best of my knowledge, no one has done the last option. I've always seen it as an opportunity for new results in a pretty basic area of data structures.

A. Rex