views:

227

answers:

7

I am trying to (simply) make a blocking thread queue, where when a task is submitted the method waits until its finished executing. The hard part though is the wait.

Here's my 12:30 AM code that I think is overkill:

public void sendMsg(final BotMessage msg) {
    try {
        Future task;
        synchronized(msgQueue) {
            task = msgQueue.submit(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            });
            //Add a seperate wait so next runnable doesn't get executed yet but
            //above one unblocks
            msgQueue.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(Controller.msgWait);
                    } catch (InterruptedException e) {
                        log.error("Wait to send message interupted", e);
                    }
                }
            });
        }
        //Block until done
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}

As you can see, there's alot of extra code there just to make it wait 1.7 seconds between tasks. Is there an easier and cleaner solution out there or is this it?


UPDATE

After lots of weird code changes and other fluff I have settled on modifying ArrayBlockingQueue so that Put blocks until done. Thanks for all of the help!

public final CustBlockingQueue<BotMessage> msgQueue = new CustBlockingQueue<BotMessage>(1);


    // A consumer thread
    new Thread(new Runnable() {
        public void run() {
            while (true)
                try {
                    // Blocks until there is something in the queue
                    BotMessage msg = msgQueue.take();
                    sendRawLine("PRIVMSG " + msg.getChannel() + " :" + msg.getMessage());
                    //Release lock so that put() unblocks
                    msgQueue.lock.lockInterruptibly();
                    msgQueue.doneProcessing.signal();
                    msgQueue.lock.unlock();
                    //Wait before continuing
                    Thread.sleep(Controller.msgWait);
                } catch (InterruptedException e) {
                    log.error("Wait for sending message interrupted", e);
                }
        }
    }).start();

public class CustBlockingQueue<E> extends ArrayBlockingQueue<E> {
    public ReentrantLock lock = new ReentrantLock(false);
    public Condition doneProcessing = lock.newCondition();

    public CustBlockingQueue(int capacity) {
        super(capacity);
    }

    public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        super.put(e);
        doneProcessing.await();
        lock.unlock();
    }
}

At least this actually does make sense. Unless someone has a really really good idea, this is what I'm sticking with

A: 

Although I am not sure why you need to wait since I think the queue implementaion will pick the tasks in order of their submission , ie it will be FIFO and hence the tasks should get executed (when you invoke msgQueue.get() )in the order of their submission. Please correct me if this is not the behaviour.

However, if we still need to pause post submission we can do as mentioned below.

Cant you directly make the thread to sleep post you submit the msg to the queue instead of submitting the wait task to queue ?

So it will be

synchronized(msgQueue) {
            task = msgQueue.submit(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            });
            Thread.sleep(Controller.msgWait); // no other thread can submit a new task until this sleep is over.
        }
Nrj
But that would lock msgQueue for a minimum of 2 seconds every time, that could be a problem depending on how many tasks are submitted
Albinoswordfish
@Albinoswordfish : Thats true, but this is what the OP desired. The intent, is however, not clear to me.
Nrj
The intent is to block until the message is sent, while having a 1.7 second wait between being sent. What you have would block until it's sent + 1.7 seconds, which isn't exactly what I would like.
TheLQ
+1  A: 

If your declaration is as follows :

ExecutorService msgQueue = Executors.newSingleThreadExecutor();

you can simply use this code to achieve what you are looking for :

msgQueue.submit(new Runnable() {
        public void run() {
            sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
            try {
                Thread.sleep(Controller.msgWait);
            } catch (InterruptedException e) {
                log.error("Wait to send message interupted", e);
            }
        }
    })

as a single threaded executor can only execute one task at once.

Things to note :

  1. There is no need to synchronize around submits to an ExecutorService
  2. There is no need for a Future if you do not need the result of the execution
  3. Your handling of the InterruptedException is a tad ropey.
The OP doesn't want to have his main thread wait for the Controller.msgWait timeout period before it can proceed. This would force his main thread to wait.
Tim Bender
How so?The executor service is run in a separate thread.The submission of the task returns instantly and the actual execution of the task will be delayed.Of course this all depends on how exactly msgQueue is defined.Although I think I missed "where when a task is submitted the method waits until its finished executing", in which case a count down latch is probably more appropriate.
@tillerman123, "task.get();"The Main Thread will block on task.get until the executor finishes executing the task. Please go read about Future.
Tim Bender
I want the runnable to send the message block but NOT the wait afterwords. Thats why I split them.And the reason I used synchronized was because multithreaded calls kept adding tasks where they shouldn't.
TheLQ
A: 

Instead of a Future use a CountDownLatch.

Basically:

public void sendMsg(final BotMessage msg) {
    try {
        final CountDownLatch latch = new CountDownLatch(1);
        msgQueue.submit(new Runnable() {
            public void run() {
                sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                latch.countDown();
                try {
                    Thread.sleep(Controller.msgWait);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); //This is usually a best practice
                    log.error("Wait to send message interupted", e);
                }
            }
        });
        //Block until done
        latch.await();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}
Tim Bender
That seems like an extra 3 lines of code that 2 lines of task.get() would solve easily. See above for how I would like it.
TheLQ
+1  A: 

I'm not really sure if I understand your goal: why do you need a queue of threads? Why can't you just queue the messages?

You can use a Producer/Consumer... have a single consumer that reads of the queue and multiple producers populating it.

Update

OK, here is an updated version that blocks until a message is queued and sends at most one message per every 1700 milliseconds.

int delay = 1700; // milliseconds
StopWatch stopwatch = new Stopwatch();
BlockingQueue<BotMessage> msgQueue = new BlockingQueue<BotMessage>();

public void main()
{
    // A consumer thread
    new Thread(new Runnable() {
        public void run() {
            while(true)
            {
                // Blocks until there is something in the queue
                BotMessage msg = msgQueue.take();
                stopwatch.stop();

                // Sleeps until the minimum delay time has passed (if it hasn't passed)
                if(stopwatch.getElapsedTime() < delay)
                {
                    Thread.sleep(delay-stopwatch.getElapsedTime());
                }
                stopwatch.start();

                sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
            }
        }
    })).start();
}

// Producers call sendMsg
public void sendMsg(final BotMessage msg) {
    msgQueue.put(msg);
}

You can find an implementation of the stopwatch here.

Lirik
The whole point of all of the spaghetti code above was to block sendMsg until the command was executed. From the docs put would only block if the queue if it was full. If I set the queue capacity to 1, put would block until space was freed, then 1.7 seconds later it would send, which isn't what I want.
TheLQ
Tried this again, still not working. For some unknown reason take() never finishes, gives null values, and other weirdness. I think I'm going to stick with a SingleThreadExecutor (unfortunately).
TheLQ
@Lord.QUackstar... take blocks until there is something in the queue. There is no reason take should return null, especially since put does not accept a null element (if you can't put null, you can't take null). Please see my last update, it should take care of your problems.
Lirik
Put would only block for a small amount of time and not for the whole possible 1.7 seconds afterwords. While that is a nice way of doing it, I still need put() to block the whole time until being sent. But I'm still trying to find something in the API that works, mainly a take() like method that doesn't block
TheLQ
@Lord... I'm confused... as far as I understood your constraint is that you must have at least 1700 milliseconds between each message being sent. Why do you want to constrain the put and not the get? Do you discard messages that are put "too soon"?
Lirik
Well in the end I did take a slightly different approach, but still using Queue's. See it in my first post
TheLQ
+2  A: 

Ok heres a thought. You can use a ScheduledExceutorService, which will remember the last time you executed a runnable and will delay the next execution accordingly, upwards of the maximum sleep time (hard coded here at 1700).

    //@GuardedBy("msgQueue")
Date mostRecentUpdate = new Date();

public void sendMsg(final BotMessage msg) {
    try {
        Future task;
        synchronized (msgQueue) {               
            long delta = new Date().getTime() - mostRecentUpdate.getTime();
            task = msgQueue.schedule(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            }, delta <= 1700 ?1700 : 0, TimeUnit.MILLISECONDS);

            mostRecentUpdate = new Date();
        }
        // Block until done
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}
John V.
O_o, I would of never thought of using ScheduledExecutorService that way. It actually works, makes sense, and removes some spaghetti. Thanks!(I'm going to keep this open if someone can do it with BlockingQueue, which is the preferred way that I could never get working)
TheLQ
+1  A: 

If you're willing to tolerate inheriting from ThreadPoolExecutor, you could define a new Executor that delays after completing each execution as follows (with one thread in the pool, this throttles the maximum rate of execution of tasks to every 1700 ms) :

final ThreadPoolExecutor msgQueue = new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                Thread.sleep(Controller.msgWait));
            } catch (InterruptedException e) {
                log.error("Wait to send message interrupted", e);
            }
        }
    };

and then use it as per normal :

Future task = msgQueue.submit(new Runnable() {
        public void run() {
            sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
        }
    });
    try {
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }

I wouldn't necessarily it's that it is cleaner than using a ScheduledExecutorService, although it does avoid the synchronized block, having to declare the future outside the block and the introduction of a Date field.

A: 

As suggested here, I am posting this as the answer since its the cleanest solution I have come up with, but not an actual copy of any of the answer's posted here.

Thanks for all the help

 public final CustBlockingQueue<BotMessage> msgQueue = new CustBlockingQueue<BotMessage>(1);


     // A consumer thread
     new Thread(new Runnable() {
         public void run() {
             while (true)
                 try {
                     // Blocks until there is something in the queue
                     BotMessage msg = msgQueue.take();
                     sendRawLine("PRIVMSG " + msg.getChannel() + " :" + msg.getMessage());
                     //Release lock so that put() unblocks
                     msgQueue.lock.lockInterruptibly();
                     msgQueue.doneProcessing.signal();
                     msgQueue.lock.unlock();
                     //Wait before continuing
                     Thread.sleep(Controller.msgWait);
                 } catch (InterruptedException e) {
                     log.error("Wait for sending message interrupted", e);
                 }
         }
     }).start();

 public class CustBlockingQueue<E> extends ArrayBlockingQueue<E> {
     public ReentrantLock lock = new ReentrantLock(false);
     public Condition doneProcessing = lock.newCondition();

     public CustBlockingQueue(int capacity) {
         super(capacity);
     }

     public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        super.put(e);
        doneProcessing.await();
        lock.unlock();
    }
}
TheLQ
You can replace the lock and condition with a CountdownLatch. Will have the same effect but fight against spurious wake ups and the potential for the lock acquisition/release to throw an exception.
John V.