views:

496

answers:

2

I'm writing a message processing application (email) that I want to have an outgoing queue. The way I've designed this is having a singleton queue class, ThreadedQueueSender, backed by an Executor Service and a BlockingQueue. Additionally, a thread pool of javax.mail.Transport objects is used to obtain and release connections to the outgoing SMTP server.

This class exposes a method, add(MimeMessage), that adds messages to the work queue (BlockingQueue).

At instantiation of the class, the ExecutorService is initialized to a ThreadPoolExecutor with a fixed number of threads, lets say 5. Each thread's run() method is in infinite loop that only exits when it detects interrupt (when ExecutorService.shutdownNow() is called).

This run method uses BlockingQueue.poll() to take messsages from the work queue until no more are available without blocking, then requests a Transport object from the connection pool, opens the connection, sends all the messages its retrieved, closes the connection and returns the Transport object.

This works, but I feel I am not taking full advantage of the ExecutorService by having a fixed number of threads that run for the life of the application. Also, I am managing the work queue myself instead of letting the concurrency frameworks handle it. How would others implement this functionality? Is it better to wrap each incoming message in a Runnable, then execute the sending logic?

Thank you, any comments are appreciated.

Ryan

A: 

Wrapping up the messages in a Runnable would force you to either make the work queue unbounded or deal with what happens when the queue is full. ThreadPoolExecutor gives you a few policies for dealing with this situation - See ThreadPoolExecutor javadoc for details. - Give up, run it yourself / discard something

Another thing you can do is allow the thread pool to create threads beyond its core size, how threads are spawned and when they are reaped is described by the first 4 arguments to the ThreadPoolExecutor constructor. How well this works in reality will depend on the resource bottleneck.

Also, what's the advantage of BlockingQueue.poll in your situation, rather than BlockingQueue.take? Both are interruptible, and your thread only has one task, so blocking is not undesirable.

daveb
Thanks for your reply! The idea behind using poll() was to take as many messages off the queue as possible without blocking, then send those all at once.
purecharger
+1  A: 

You should create tasks for every piece of work that should be done by your executor service.

For example you could create a callable "MailSendingTask" that holds the MimeMessage and wraps the mail sending. Queue these MailSendingTasks by submitting them to your executor. Now your Executor decides how many threads will be created (Config it by setting lower and upper thread pool bounds)

You only need to create 2 or 3 classes/interfaces

  • one MailService Interface that provides a simple send(MimeMessage msg) method
  • one MailServiceImplementation Class that implements MailService and holds a reference to a configured executor
  • one class MailSenderTask implementing the callable interface that holds a reference to the MimeMessage object and which does the mail sending.

You could even go futher by creating an extra service that manages the mail socket connections which could be used by the MailSenderTask.

If you want to add "cancelation" you should look at the classes Future and FutureTask

MrWhite
Ok, thinking about it from a task-centric approach, my task is: "take as many messages from the queue as possible without blocking, and send using a single Transport connection". So then I could implement the run() (or call() ) method, as:if(at least one message can be taken without blocking) continue taking until blocks, then sendelse(block until a message arrives) continue taking until blocking again?
purecharger
Ok, i think i understand. You could use a ConcurrentLinkedQueue that can be shared throughout your sending threads. Just add the MimeMessages to the queue and let your threads poll() until it is empty. You dont have to be aware of the synchronization when choosing this implementation.
MrWhite