views:

297

answers:

4

I have a computing map (with soft values) that I am using to cache the results of an expensive computation.

Now I have a situation where I know that a particular key is likely to be looked up within the next few seconds. That key is also more expensive to compute than most.

I would like to compute the value in advance, in a minimum-priority thread, so that when the value is eventually requested it will already be cached, improving the response time.

What is a good way to do this such that:

  1. I have control over the thread (specifically its priority) in which the computation is performed.
  2. Duplicate work is avoided, i.e. the computation is only done once. If the computation task is already running then the calling thread waits for that task instead of computing the value again (FutureTask implements this. With Guava's computing maps this is true if you only call get but not if you mix it with calls to put.)
  3. The "compute value in advance" method is asynchronous and idempotent. If a computation is already in progress it should return immediately without waiting for that computation to finish.
  4. Avoid priority inversion, e.g. if a high-priority thread requests the value while a medium-priority thread is doing something unrelated but the the computation task is queued on a low-priority thread, the high-priority thread must not be starved. Maybe this could be achieved by temporarily boosting the priority of the computing thread(s) and/or running the computation on the calling thread.

How could this be coordinated between all the threads involved?


Additional info
The computations in my application are image filtering operations, which means they are all CPU-bound. These operations include affine transforms (ranging from 50µs to 1ms) and convolutions (up to 10ms.) Of course the effectiveness of varying thread priorities depends on the ability of the OS to preempt the larger tasks.

+2  A: 

One common way of coordinating this type of situation is to have a map whose values are FutureTask objects. So, stealing as an example some code I wrote from a web server of mine, the essential idea is that for a given parameter, we see if there is already a FutureTask (meaning that the calculation with that parameter has already been scheduled), and if so we wait for it. In this example, we otherwise schedule the lookup, but that could be done elsewhere with a separate call if that was desirable:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

In terms of thread priorities: I wonder if this will achieve what you think it will? I don't quite understand your point about raising the priority of the lookup above the waiting thread: if the thread is waiting, then it's waiting, whatever the relative priorities of other threads... (You might want to have a look at some articles I've written on thread priorities and thread scheduling, but to cut a long story short, I'm not sure that changing the priority will necessarily buy you what you're expecting.)

Neil Coffey
See mdma's answer (and the linked article on priority inversion) to see why I am concerned about thread priorities.
finnw
I notice that you submit the task *then* check wheter another `Future` is already in the map and interrupt it if so. Why not create the `Future`, attempt to add it to the map then submit it to the executor only if the key was not already present in the map? That way you do not waste CPU cycles if the task is not interruptible.
finnw
+2  A: 

I suspect that you are heading down the wrong path by focusing on thread priorities. Usually the data that a cache holds is expensive to compute due I/O (out-of-memory data) vs. CPU bound (logic computation). If you're prefetching to guess a user's future action, such as looking at unread emails, then it indicates to me that your work is likely I/O bound. This means that as long as thread starvation does not occur (which schedulers disallow), playing games with thread priority won't offer much of a performance improvement.

If the cost is an I/O call then the background thread is blocked waiting for the data to arrive and processing that data should be fairly cheap (e.g. deserialization). As the change in thread priority won't offer much of a speed-up, performing the work asynchronously on background threadpool should be sufficient. If the cache miss penalty is too high, then using multiple layers of caching tends to help to further reduce the user perceived latency.

Ben Manes
The computation is CPU-bound (image processing)
finnw
+4  A: 

You can arrange for "once only" execution of the background computation by using a Future with the ComputedMap. The Future represents the task that computes the value. The future is created by the ComputedMap and at the same time, passed to an ExecutorService for background execution. The executor can be configured with your own ThreadFactory implementation that creates low priority threads, e.g.

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

When the value is needed, your high-priority thread then fetches the future from the map, and calls the get() method to retrieve the result, waiting for it to be computed if necessary. To avoid priority inversion you add some additional code to the task:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

This takes care of raising the priority of the task to the priority of the thread calling get() if the task has not completed, and returns the priority to the original when the task completes, normally or otherwise. (To keep it brief, the code doesn't check if the priority is indeed greater, but that's easy to add.)

When the high priority task calls get(), the future may not yet have begun executing. You might be tempted to avoid this by setting a large upper bound on the number of threads used by the executor service, but this may be a bad idea, since each thread could be running at high priority, consuming as much cpu as it can before the OS switches it out. The pool should probably be the same size as the number of hardware threads, e.g. size the pool to Runtime.availableProcessors(). If the task has not started executing, rather than wait for the executor to schedule it (which is a form of priority inversion, since your high priority thread is waiting for the low-priority threads to complete) then you may choose to cancel it from the current executor and re-submit on an executor running only high-priority threads.

mdma
My project is already using the latest version of Guava so I can use a `ThreadFactoryBuilder` - simpler than the custom thread factory. Thanks for the priority inversion link. I will upvote this later when I get my votes back.
finnw
I'd not seen the ThreadFactoryBuilder in Guava, it's nice! The rest of the post should still be relevant though, particularly the task that handles priority inversion for started tasks, and the strategy of rescheduling non-started tasks on a high-priority executor. This will ensure that once your high priority thread want's the result it is computed as high priority, whether computation has already started or not.
mdma
The other thing I thought of was calling `run` on the consuming thread. The documentation is unclear but in Sun's implementation of `RunnableFuture` the second and subsequent calls to `run` (overlapping or not) are no-ops. Is there another reason why you avoid this?
finnw
I would avoid calling run, since that's not the specified contract. (Run does not return a result, for example. And if the previous run has completed, but with an exception, you will not get the exception.) Finally, as I've implemented it above, you won't get the priority inversion fix, which is in the get() method.
mdma
The call to `run` would be followed immediately by a call to `get` which will rethrow the exception.
finnw
I wrote a `RunnableFuture` implementation similar to yours except with an array of priorities of all waiting threads. I will post a simplified version as an answer if I can figure out a way to shorten it.
finnw
+1  A: 

As an alternative to thread priorities, you could perform a low-priority task only if no high-priority tasks are in progress. Here's a simple way to do that:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

In your use case, both Impl() methods would call get() on the computing map, highPriorityImpl() in the same thread and lowPriorityImpl() in a different thread.

You could write a more sophisticated version that defers low-priority tasks until the high-priority tasks complete and limits the number of concurrent low-priority tasks.

Jared Levy
My low-priority task takes a long time to run and is usually still running when the next high-priority request arrives. I like this method but to take full advantage of it I would need to split my tasks into smaller subtasks (and by using thread priorities I hope to get the OS to do that for me.)
finnw