In my application I'm performing somewhat heavy lookup operations. These operations must be done within a single thread (persistence framework limitation).
I want to cache the results. Thus, I have a class UMRCache, with an inner class Worker:
public class UMRCache {
private Worker worker;
private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
public UMRCache(Repository repository) {
this.worker = new Worker(repository);
this.worker.start();
}
public Object get(String key) {
if (this.cache.containsKey(key)) {
// If the element is already cached, get value from cache
return this.cache.get(key);
}
synchronized (this.requests) {
// Add request to queue
this.requests.add(key);
// Notify the Worker thread that there's work to do
this.requests.notifyAll();
}
synchronized (this.cache) {
// Wait until Worker has updated the cache
this.cache.wait();
// Now, cache should contain a value for key
return this.cache.get(key);
}
}
private class Worker extends Thread {
public void run() {
boolean doRun = true;
while (doRun) {
synchronized (requests) {
while (requests.isEmpty() && doRun) {
requests.wait(); // Wait until there's work to do
}
synchronized (cache) {
Set<String> processed = new HashSet<String>();
for (String key : requests) {
// Do the lookup
Object result = respository.lookup(key);
// Save to cache
cache.put(key, result);
processed.add(key);
}
// Remove processed requests from queue
requests.removeAll(processed);
// Notify all threads waiting for their requests to be served
cache.notifyAll();
}
}
}
}
}
}
I have a testcase for this: public class UMRCacheTest extends TestCase { private UMRCache umrCache;
public void setUp() throws Exception {
super.setUp();
umrCache = new UMRCache(repository);
}
public void testGet() throws Exception {
for (int i = 0; i < 10000; i++) {
final List fetched = Collections.synchronizedList(new ArrayList());
final String[] keys = new String[]{"key1", "key2"};
final String[] expected = new String[]{"result1", "result2"}
final Random random = new Random();
Runnable run1 = new Runnable() {
public void run() {
for (int i = 0; i < keys.length; i++) {
final String key = keys[i];
final Object result = umrCache.get(key);
assertEquals(key, results[i]);
fetched.add(um);
try {
Thread.sleep(random.nextInt(3));
} catch (InterruptedException ignore) {
}
}
}
};
Runnable run2 = new Runnable() {
public void run() {
for (int i = keys.length - 1; i >= 0; i--) {
final String key = keys[i];
final String result = umrCache.get(key);
assertEquals(key, results[i]);
fetched.add(um);
try {
Thread.sleep(random.nextInt(3));
} catch (InterruptedException ignore) {
}
}
}
};
final Thread thread1 = new Thread(run1);
thread1.start();
final Thread thread2 = new Thread(run2);
thread2.start();
final Thread thread3 = new Thread(run1);
thread3.start();
thread1.join();
thread2.join();
thread3.join();
umrCache.dispose();
assertEquals(6, fetched.size());
}
}
}
The test fails randomly, at about 1 out of 10 runs. It will fail at the last assertion: assertEquals(6, fetched.size()), at assertEquals(key, results[i]), or sometimes the test runner will never finish.
So there's something buggy about my thread logic. Any tips?
EDIT:
I might have cracked it now, thanks to all who have helped. The solution seems to be:
public Object get(String key) {
if (this.cache.containsKey(key)) {
// If the element is already cached, get value from cache
return this.cache.get(key);
}
synchronized (this.requests) {
// Add request to queue
this.requests.add(key);
// Notify the Worker thread that there's work to do
this.requests.notifyAll();
}
synchronized (this.cache) {
// Wait until Worker has updated the cache
while (!this.cache.containsKey(key)) {
this.cache.wait();
}
// Now, cache should contain a value for key
return this.cache.get(key);
}
}