views:

473

answers:

5

I am working on a practical scenario related with Java;a socket program. The existing system and the expected system are as follows.

Existing System - The system checks that a certain condition is satisfied. If so It will create some message to be sent and put it into a queue.

The queue processor is a separate thread. It periodically check the queue for existence of items in it. If found any items (messages) it just sends the message to a remote host (hardcoded) and remove the item from queue.

Expected System - This is something like that. The message is created when a certain condition is satisfied but in every case the recipient is not same. So there are many approaches.

  1. putting the message into the same queue but with its receiver ID. In this case the 2nd thread can identify the receiver so the message can be sent to that.

  2. Having multiple threads. In this case when the condition is satisfied and if the receiver in "New" it creates a new queue and put the message into that queue. And a new thread initializes to process that queue. If the next messages are directed to same recipient it should put to the same queue and if not a new queue and the thread should be created.

Now I want to implement the 2nd one, bit stucked. How should I do that? A skeleton would be sufficient and you won't need to worry to put how to create queues etc... :)

Update : I also think that the approach 1 is the best way to do that. I read some articles on threading and came to that decision. But it is really worth to learn how to implement the approach 2 as well.

+4  A: 

Consider using Java Message Services (JMS) rather than re-inventing the wheel?

Paul
+2  A: 

Can I suggest that you look at BlockingQueue ? Your dispatch process can write to this queue (put), and clients can take or peek in a threadsafe manner. So you don't need to write the queue implementation at all.

If you have one queue containing different message types, then you will need to implement some peek-type mechanism for each client (i.e. they will have to check the head of the queue and only take what is theirs). To work effectively then consumers will have to extract data required for them in a timely and robust fashion.

If you have one queue/thread per message/consumer type, then that's going to be easier/more reliable.

Your client implementation will simply have to loop on:

while (!done) {
   Object item = queue.take();
   // process item
}

Note that the queue can make use of generics, and take() is blocking.

Of course, with multiple consumers taking messages of different types, you may want to consider a space-based architecture. This won't have queue (FIFO) characteristics, but will allow you multiple consumers in a very easy fashion.

Brian Agnew
+1  A: 

First of all, if you are planning to have a lot of receivers, I would not use the ONE-THREAD-AND-QUEUE-PER-RECEIVER approach. You could end up with a lot of threads not doing anything most of the time and I could hurt you performance wide. An alternative is using a thread pool of worker threads, just picking tasks from a shared queue, each task with its own receiver ID, and perhaps, a shared dictionary with socket connections to each receiver for the working threads to use.

Having said so, if you still want to pursue your approach what you could do is:

1) Create a new class to handle your new thread execution:

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) On your main thread, create the messages and use a dictionary (hash table) to see if the receiver's threads is already created. If is is, the just queue the new message. If not, create a new thread, put it in the hashtable and queue the new message:

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

Good luck.

Edit: you can improve this solution by using BlockingQueue Brian mentioned with this approach.

Pablo Santa Cruz
+2  A: 

You have to weigh up slightly whether you have lots of end machines and occasional messages to each, or a few end machines and frequent messages to each.

If you have lots of end machines, then literally having one thread per end machine sounds a bit over the top unless you're really going to be constantly streaming messages to all of those machines. I would suggest having a pool of threads which will only grow between certain bounds. To do this, you could use a ThreadPoolExecutor. When you need to post a message, you actually submit a runnable to the executor which will send the message:

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

If you just have a few end machines, then you could have a BlockingQueue per machine, and a thread per blocking queue sitting waiting for the next message. In this case, the pattern is more like this (beware untested off-top-of-head Sunday morning code):

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

In this case, we close the connection if no message for that machine arrives within 60 seconds.

Should you use JMS instead? Well, you have to weigh up whether this sounds complicated to you. My personal feeling is it isn't a complicated enough a task to warrant a special framework. But I'm sure opinions differ.

P.S. In reality, now I look at this, you'd probably put the queue inside the thread object and just map machine ID -> thread object. Anyway, you get the idea.

Neil Coffey
+2  A: 

You might try using SomnifugiJMS, an in-vm JMS implementation using java.util.concurrent as the actual "engine" of sorts.

It will probably be somewhat overkill for your purposes, but may well enable your application to be distributed for little to no additional programming (if applicable), you just plug in a different JMS implementation like ActiveMQ and you're done.

Gerco Dries