views:

835

answers:

4

I have two threads. The producer is producing pieces of data (String objects), where the consumer processes these strings. The catch is that my application only needs the most recent data object to be processed. In other words, if the producer managed to produce two strings "s1" and then "s2" then I want the consumer to process only "s2". "s1" can be safely discarded.

Of course there's no problem implementing a class that realizes this behavior, but I want to use a standard mechanism from java.util.concurrent (if such a mechanism exists). Note that SynchronousQueue is not a good solution: the consumer will block when enqueueing "s1" and will not get the chance to produce "s2".

(In short, I am looking for a single-element collection with a blocking remove operation and a non-blocking set operation)

Any ideas?

A: 

You could use an array of size one for that:

String[] oeq = new String[1];

Sample source:

public class Test {
    private static final String[] oeq = new String[1];
    public static void main(String[] args) {
        (new Producer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
    }

    private static class Producer extends Thread {
        public void run() {
            int i=0;
            while(true) {
                i++;
                synchronized(oeq) {
                    oeq[0] = ""+i;
                    oeq.notifyAll();
                }
            }
        }
    }

    private static class Consumer extends Thread {
        public void run() {
            String workload = null;
            while(true) {
                synchronized(oeq) {
                    try {
                        oeq.wait();
                    } catch(InterruptedException ie) {
                        ie.printStackTrace();
                    }
                    if(oeq[0] != null) {
                        workload = oeq[0];
                        oeq[0] = null;
                    }
                }
                if(workload != null) {
                    System.out.println(workload);
                }
            }
        }
    }
}
Johannes Weiß
This will be very inefficient. Consumer thread should not block CPU while waiting for work.
Peter Štibraný
Actually, consumer here would consume all CPU, as there's no wait() in the loop (this may be what you meant by "block"). Which is why, I think, the OP wanted to use an existing JDK class -- it's easy to write a broken homegrown concurrent object.
kdgregory
Both hints are true, I fixed the first one, thanks.
Johannes Weiß
OK, to the point of homegrown concurrency objects being broken: why are you extending Thread?
kdgregory
implementing Runnable would have been better probably. It's a quick sample
Johannes Weiß
or do you mean anything else?
Johannes Weiß
Exchanger seems to be the way to go!
Johannes Weiß
+2  A: 

I think your best answer is probably to use ArrayBlockingQueue, where the producer (you only have one producer, right?) removes any existing element before adding the new element.

Sure, there are race conditions in this implementation: the consumer could start processing an element just before the producer removes it. But those race conditions will always exist, no matter what data structure you use.

kdgregory
+2  A: 

What about the Exchanger class? This is the standard way of exchanging objects between threads. Specialize it with your class, may be a list of strings. Make the consumer only use the first/last one.

kgiannakakis
probably a better approach than mine, although you do need to set the producer's timeout to 0
kdgregory
Exchanger also blocks producer :-(
Peter Štibraný
unless you set the timeout to 0 (or a negative number)
kdgregory
A: 

Well, if you only want the most recently produced string, then you don't need a queue at all - all you need is a string reference: the producer sets it, the consumer reads it. If the consumer takes so long to read it that the producer re-sets it ... so what?

Setting and reading references are atomic. The only issue is if you want the consumer to somehow be notified that there's a string available. But even then ... if the consumer is doing something that takes a while, then you really don't need any fancy-pants stuff from the concurrency libraries.

Note, btw, that this example works with any number of producer and/or consumer threads.

import java.util.Random;

public class Example {
    public static void main(String[] av) {
     new Example().go();
    }

    Object mutex  = new Object();
    String theString = null;

    void go() {
     Runnable producer = new Runnable() {
      public void run() {
       Random rnd = new Random();
       try {
        for (;;) {
         Thread.sleep(rnd.nextInt(10000));
         synchronized (mutex) {
          theString = "" + System.currentTimeMillis();
          System.out.println("Producer: Setting string to " + theString);
          mutex.notify();
         }
        }
       } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
       }

      }
     };

     Runnable consumer = new Runnable() {
      public void run() {
       try {
        String mostRecentValue = null;
        Random rnd = new Random();
        for (;;) {
         synchronized (mutex) {
          // we use == because the producer
          // creates new string
          // instances
          if (theString == mostRecentValue) {
           System.out.println("Consumer: Waiting for new value");
           mutex.wait();
           System.out.println("Consumer: Producer woke me up!");
          } else {
           System.out.println("Consumer: There's a new value waiting for me");
          }
          mostRecentValue = theString;
         }
         System.out.println("Consumer: processing " + mostRecentValue);
         Thread.sleep(rnd.nextInt(10000));
        }
       } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
       }
      }
     };


     new Thread(producer).start();
     new Thread(consumer).start();
    }
}
paulmurray
you need to make the reference volatile, and be running in a 1.5+ JDK, for this to be guaranteed (the Java Memory Model allows a thread to maintain its own copy indefinitely otherwise)
kdgregory
Yup - forgot that bit.
paulmurray