I am creating a graphing calculator.
In an attempt to squeeze some more performance out of it, I added some multithreaded to the line calculator. Essentially what my current implementation does is construct a thread-safe Queue
of X values, then start however many threads it needs, each one calculating a point on the line using the queue to get its values, and then ordering the points using a HashMap
when the calculations are done. This implementation works great, and that's not where my race condition is (merely some background info).
In examining the performance results from this, I found that the HashMap
is a performance bottleneck, since I do that synchronously on one thread. So I figured that ordering each point as its calculated would work best. I tried a PriorityQueue
, but that was slower than the HashMap
.
I ended up creating an algorithm that essentially works like this:
I construct a list of X values to calculate, like in my current algorithm.
I then copy that list of values into another class, unimaginatively and temporarily named BlockingList
, which is responsible for ordering the points as they are calculated.
BlockingList
contains a put()
method, which takes in two BigDecimal
s as parameters, the first the X value, the second the calculated Y value. put()
will only accept a value if the X value is the next one on the list to be accepted in the list of X values, and will block until another thread gives it the next excepted value.
For example, since that can be confusing, say I have two threads, Thread-1
and Thread-2
. Thread-2
gets the X value 10.0
from the values queue, and Thread-1
gets 9.0
. However, Thread-1
completes its calculations first, and calls put()
before Thread-2
does. Because BlockingList
is expecting to get 10.0
first, and not 9.0
, it will block on Thread-1
until Thread-2
finishes and calls put()
. Once Thread-2
gives BlockingList
10.0
, it notify()
s all waiting threads, and expects 9.0
next. This continues until BlockingList
gets all of its expected values.
(I apologise if that was hard to follow, if you need more clarification, just ask.)
As expected by the question title, there is a race condition in here. If I run it without any System.out.println
s, it will sometimes lock because of conflicting wait()
and notifyAll()
s, but if I put a println
in, it will run great.
A small implementation of this is included below, and exhibits the same behavior:
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Example {
public static void main(String[] args) throws InterruptedException {
// Various scaling values, determined based on the graph size
// in the real implementation
BigDecimal xMax = new BigDecimal(10);
BigDecimal xStep = new BigDecimal(0.05);
// Construct the values list, from -10 to 10
final ConcurrentLinkedQueue<BigDecimal> values = new ConcurrentLinkedQueue<BigDecimal>();
for (BigDecimal i = new BigDecimal(-10); i.compareTo(xMax) <= 0; i = i.add(xStep)) {
values.add(i);
}
// Contains the calculated values
final BlockingList list = new BlockingList(values);
for (int i = 0; i < 4; i++) {
new Thread() {
public void run() {
BigDecimal x;
// Keep looping until there are no more values
while ((x = values.poll()) != null) {
PointPair pair = new PointPair();
pair.realX = x;
try {
list.put(pair);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}.start();
}
}
private static class PointPair {
public BigDecimal realX;
}
private static class BlockingList {
private final ConcurrentLinkedQueue<BigDecimal> _values;
private final ConcurrentLinkedQueue<PointPair> _list = new ConcurrentLinkedQueue<PointPair>();
public BlockingList(ConcurrentLinkedQueue<BigDecimal> expectedValues) throws InterruptedException {
// Copy the values into a new queue
BigDecimal[] arr = expectedValues.toArray(new BigDecimal[0]);
_values = new ConcurrentLinkedQueue<BigDecimal>();
for (BigDecimal dec : arr) {
_values.add(dec);
}
}
public void put(PointPair item) throws InterruptedException {
while (item.realX.compareTo(_values.peek()) != 0) {
synchronized (this) {
// Block until someone enters the next desired value
wait();
}
}
_list.add(item);
_values.poll();
synchronized (this) {
notifyAll();
}
}
}
}
My question is can anybody help me find the threading error?
Thanks!