views:

106

answers:

3

In my application I'm performing somewhat heavy lookup operations. These operations must be done within a single thread (persistence framework limitation).

I want to cache the results. Thus, I have a class UMRCache, with an inner class Worker:

public class UMRCache {
  private Worker worker;
  private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
  private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
  public UMRCache(Repository repository) {
    this.worker = new Worker(repository);
    this.worker.start();
  }

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

 private class Worker extends Thread {
   public void run() {
      boolean doRun = true;
      while (doRun) {
         synchronized (requests) {
            while (requests.isEmpty() && doRun) {
               requests.wait(); // Wait until there's work to do
            }
            synchronized (cache) {
               Set<String> processed = new HashSet<String>();
               for (String key : requests) {
                 // Do the lookup
                 Object result = respository.lookup(key);
                 // Save to cache
                 cache.put(key, result);
                 processed.add(key); 
               }
               // Remove processed requests from queue
               requests.removeAll(processed);
               // Notify all threads waiting for their requests to be served
               cache.notifyAll();
            } 
         }
      }
   }
}

}

I have a testcase for this: public class UMRCacheTest extends TestCase { private UMRCache umrCache;

public void setUp() throws Exception {
    super.setUp();
    umrCache = new UMRCache(repository);
}

public void testGet() throws Exception {
    for (int i = 0; i < 10000; i++) {
        final List fetched = Collections.synchronizedList(new ArrayList());
        final String[] keys = new String[]{"key1", "key2"};
        final String[] expected = new String[]{"result1", "result2"}
        final Random random = new Random();

        Runnable run1 = new Runnable() {
            public void run() {
                for (int i = 0; i < keys.length; i++) {
                    final String key = keys[i];
                    final Object result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };
        Runnable run2 = new Runnable() {
            public void run() {
                for (int i = keys.length - 1; i >= 0; i--) {
                    final String key = keys[i];
                    final String result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };

        final Thread thread1 = new Thread(run1);
        thread1.start();
        final Thread thread2 = new Thread(run2);
        thread2.start();
        final Thread thread3 = new Thread(run1);
        thread3.start();
        thread1.join();
        thread2.join();
        thread3.join();
        umrCache.dispose();
        assertEquals(6, fetched.size());
    }
}

}

The test fails randomly, at about 1 out of 10 runs. It will fail at the last assertion: assertEquals(6, fetched.size()), at assertEquals(key, results[i]), or sometimes the test runner will never finish.

So there's something buggy about my thread logic. Any tips?

EDIT:

I might have cracked it now, thanks to all who have helped. The solution seems to be:

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     while (!this.cache.containsKey(key)) {
       this.cache.wait();
     }
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }
+1  A: 

The variable fetched in your test is an ArrayList and is accessed and updated from your two anonymous Runnable instances.

ArrayList is not thread safe, from the documentation:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list:

Hence I think your test needs a little adjusting.

djna
Thank you, should have spotted that. Added a Collections.synchronizedList wrap to the 'fetched' list.
Vidar S. Ramdal
But the problem of the occasional test runner hanging is still visible. When I pause the hanging runner, I can see that "Thread-0" is stuck on requests.wait() in the Worker.run() method, while "Thread-1" is stuck on this.cache.wait() in UMRCache.get().
Vidar S. Ramdal
+1  A: 

I noticed your lookup in cache isn't atomic operation:

if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
    return this.cache.get(key);
}

Since you never delete from cache in your code, you always will get some value by this code. But if, in future, you plan to clean cache, lack of atomicity here will become a problem.

Victor Sorokin
Thank you for your input. Clearing the cache is not an issue currently, but will certainly be in the future.
Vidar S. Ramdal
+1  A: 

get() method logic can miss result and get stuck



   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }

   // ----- MOMENT1.  If at this moment Worker puts result into cache it
   // will be missed since notification will be lost

   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();

    // ----- MOMENT2.  May be too late, since cache notifiation happened before at MOMENT1

    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
maxim_ge
Ah, of course! It seems the problem can be solved by wrapping the entire block in synchronized (this.cache), so that:synchronized (this.cache) { synchronized (this.requests) { this.requests.add(key); this.requests.notifyAll(); } this.cache.wait(); return this.cache.get(key);}Doesn' seem to have a significant performance impact, either. Thanks sir, well spotted!
Vidar S. Ramdal
This can cause deadlock, since getter will lock cache then results whereas Worker locks results then cache. To avoid deadlock threads must lock objects in the same order.
maxim_ge
Dammit, you're right. I have occasional hanging test runs. But then, I'm totally blank on what should be the sequence of synchronized and wait()s. Can you give me a hint?
Vidar S. Ramdal
OK, think I got it now. See edit of the original question.
Vidar S. Ramdal
Well, looks like get() is moreless ok now. But for this purposes i'd use ExecutorService + Future interfaces from Java concurrency package. Ref Future example - almost exactly what you do
maxim_ge
Thanks for the tip! I'll have a deeper look into java.util.concurrent.
Vidar S. Ramdal
Actually, the get() method needed even one more modification: the do { cache.wait() } while (!cache.containsKey(key)) needs to be while (!cache.containsKey(key)) {cache.wait()}
Vidar S. Ramdal