views:

50

answers:

2

I'm looking for a concurrency object that can assist in the following use case:

  • threads/entities: 1 publisher (unique), 0-many readers
  • publisher frequently / erratically updates a data structure, needs to do so quickly and with minimal latency
  • each reader has read access to the data structure (either through something that doesn't allow writes, or because the reader implicitly promises not to change data)
  • each reader is willing to try repeatedly to access the data structure, as long as it can detect when the publisher has come and changed it, because it knows it will eventually get enough time to read what it needs.

Any suggestions? I can use a ReentrantReadWriteLock but am somewhat concerned about blocking the publisher. I would rather have the publisher be able to ruin a reader's chance to read, than to have a reader be able to hold up the publisher.

publisher thread:

 PublisherSignal ps = new PublisherSignal();
 publishToAllReaders(ps.getReaderSignal());

   ...

 while (inLoop())
 {
      ps.beginEdit();
      data.setSomething(someComputation());
      data.setSomethingElse(someOtherComputation());
      ps.endEdit();

      doOtherStuff();
 }

reader thread:

 PublisherSignal.Reader rs = acquireSignalFromPublisher();

      ...

 while (inLoop())
 {
      readDataWhenWeGetAChance();

      doOtherStuff();
 }

      ...

 public readDataWhenWeGetAChance()
 {
      while (true)
      {
           rs.beginRead();
           useData(data.getSomething(), data.getSomethingElse());
           if (rs.endRead())
           {
               // we get here if the publisher hasn't done a beginEdit()
               // during our read.
               break;
           }

           // darn, we have to try again. 
           // might as well yield thread if appropriate
           rs.waitToRead();
      }
 }

edit: at a higher level, what I am trying to do is have the publisher change data several thousand times a second, and then have readers display the latest update at a much slower rate (5-10 times a second). I would use a ConcurrentLinkedQueue to publish the fact that an update has occurred, except that (a) there may be hundreds of updates on the same item, which I would like to coalesce, because having to copy tons of data repeatedly seems like a waste is a performance problem, and (b) having multiple readers seems to rule out a queue... I suppose I could have one master proxy reader and have it notify each of the real readers.

+1  A: 

Why not use a BlockingQueue ?

Your publisher can write to this queue independently of whatever is reading. The reader (similarly) can take stuff off the queue and not worry about blocking the writer. Thread safety is dealt with by the queue, so 2 threads can write/read with no further synchronisation etc. required.

From the linked doc:

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }
Brian Agnew
see my comment I added.
Jason S
I would have one consumer grab the data, coalesce it and then pass it on to multiple downstream consumers (perhaps via other queues?)
Brian Agnew
A: 

Hmmm... I suppose my stumbling block is around the shared data structure itself... I've been using something like

 public class LotsOfData
 {
      int fee;
      int fi;
      int fo;
      int fum;

      long[] other = new long[123];

      /* other fields too */
 }

where the publisher updates data often, but only one field at a time.

It sounds like maybe I should find a way to serialize updates in a way that's conducive to using a producer-consumer queue:

 public class LotsOfData
 {
      enum Field { FEE, FI, FO, FUM };
      Map<Field, Integer> feeFiFoFum = new EnumMap<Field, Integer>();

      long[] other = new long[123];

      /* other fields too */
 }

and then post change items to a queue, like (FEE, 23) for the feeFiFoFum fields, and (33, 1234567L) for the other array. (Bean-type reflection is almost certainly out, for performance reasons.)

Still, it seems like I'm spoiled by the apparent simplicity of just having the publisher write whatever it wants, and knowing that there will be time (eventually) for the reader(s) to go in and get a consistent set of data, if only it had a flag it could use to tell if the data has been modified.

update: interesting, I tried this approach, with a ConcurrentLinkedQueue of Mutation objects (storing only the state necessary for 1 change) for a class similar to the first LotsOfData above (4 int fields and an array of 27 longs), and a producer that produces a total of 10 million mutations with Thread.sleep(1) in between batches of about 10000, and a consumer that checks the queue every 100 milliseconds, and consumes whatever mutations are present. I ran the test in a number of ways:

  • empty action within test framework (just looping 1000 times, calling Thread.sleep(1), and checking whether to use a null object): 1.95 sec on my 3GHz Pentium 4 running jre6u13.
  • test action 1 -- create and apply mutations on the producer end only: 4.3 sec
  • test action 2 -- create and apply mutations on the producer end, place each on queue as they are created: 12 sec

so that's 230nsec on average to create each mutation object, and 770nsec on average to enqueue/dequeue each mutation object onto the queue in the producer and pull it off in the consumer (time to execute the mutations for primitive types seems negligible, compared to object creation and queue operations, as it should be). Not bad I guess, and it gives me some metrics to estimate performance cost for this approach.

Jason S