views:

736

answers:

3

I'd like to create some sort of Producer/Consumer threading app. But I'm not sure what the best way to implement a queue between the two. So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.

Main class-

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Queue class-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OR

Main class-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        readQ = queue;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        readQ = queue;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Queue class-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

And go!

+1  A: 

You are reinventing the wheel.

If you need persistence and other enterprise features use JMS (I'd suggest ActiveMq).

If you need fast in-memory queues use one of the impementations of java's Queue.

If you need to support java 1.4 or earlier, use Doug Lea's excellent concurrent package.

flybywire
+10  A: 

Java 5+ has all the tools you need for this kind of thing. You will want to:

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So the producers submit directly to consumers.

cletus
Ummmm, Wow! Not even sure where to start with this.
Gareth
Cletus is right on the money for more information to help clarify "where to start"http://java.sun.com/docs/books/tutorial/essential/concurrency/
edwardTheGreat
+1  A: 

OK, as others note, the best thing to do is to use java.util.concurrent package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.

As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.

Leaving that aside, the second implementation seem "better" (frankly I won't allow that in our team but..). You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.

If you want to go ahead with your own implementation (for learning purpose I guess?), supply a start() method at least. You should construct the object (you can instantiate the Thread object), and then call start() to start the thread.

Edit:ExecutorService have their own queue so this can be confusing.. Here's something to get you started. Have fun!

public class Main {


    public static void main(String[] args){

        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }


}



class Produce implements Runnable{
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }

}

class Consume implements Runnable{
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }
    @Override
    public void run() {
        cake.eat();
    }
}

Further EDIT: For producer, instead of while(true), you can do something like:

@Override
public void run(){
    while(!Thread.isInterrupted){
        //do stuff
    }
}

This way you can shutdown the executor by calling .shutdownNow(). If you'd use while(true), it won't shutdown.

Also note that the Producer is still vulnerable to RuntimeExceptions. If needed, you might want to consider ScheduledExecutor.scheduleWithFixedDelay()--but if you start thinking this much, you should really read Java Concurrency in Practice!

Enno Shioji
So I should add a start() method to the Consumer and Producer? Are you saying I should put something like this in my main method rather? consumer = new Consumer(); consumer.start(readQ);or this? consumer = new Comsumer(readQ); consumer.start();
Gareth
You'd normally do new Comsumer(readQ); consumer.start();. In your case, it is advisable to declare the queue private final, and if you do that, you need to set the queue in the constructor. If this is production code, I strongly advice you to go with cletus' answer. If you absolutely need to use your in-house queue, then you should use ExecutorService executor = Executors.newSingleThreadExecutor() instead of a raw thread instead. This will among other things, protect you from RuntimeException halting you system.
Enno Shioji
Thanks. very helpful. I've gone with the BlockingQueue like cletus suggested over the in-house queue. Still trying to get my head around the ExecutorService class but when I do I will definitely use it. Thanks for your help.
Gareth
@Gareth: Hope the edit helps!
Enno Shioji