views:

98

answers:

4

This is more a generic question than a specific one. I'm trying to have a multi threaded environment that stays active so that I can just submit tasks and run them. I want to do this without the hassle of executing in a web server or application server. The idea was to use a java thread pool for this, but the issue here is that the pool stays open just until my main method finishes, after which obviously it closes and the program finishes. How can I prevent this from happening? I'm sure there are several options, some more naive than others (while true loops come to mind). Any ideas? Thanks.

A: 

What you definitly can do is a loop something like this:

while (true) {
  Thread.sleep(1000);
}

And if you wan't to stop the process you just kill it. Not an elegant solution, however.

Better would be, to listen on some port and wait till you get some command on that port:

ServerSocket socket = new ServerSocket(4444);
while (true) {
  Socket clientSocket = socket.accept();
  // get input stream, etc.
  // if(STOP keywoard read) break
}
inflagranti
+1  A: 

How are your tasks being accepted?

In many cases I saw there was 1 thread waiting or polling from tasks and passing them on. This thread will keep your application alive and can also wait for some sign to shutdown the application and the wait for the current jobs to be finished and clean up.

All in all I find that the point where the hassle of dealing with these application lifecycle events exceed the hassle of deploying to a simple container like Jetty is easily reached. Especially for things running in the background I find a lot of value in a couple of silly JSP pages to verify it is still working (to integrate with our automatic monitoring) and getting some statistics.

Peter Tillemans
A: 

Here is a sample I wrote for another post that allows you to gave a Thread Pool on another Thread that you can post messages to. The main() creates the Threads and also allows you to stop the Thread when you want. To stop main from finishing just eliminate the processor.stopProcessing(); line in main.

package com.rch.test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Executor
{
    /**
     * Class to encapsulate a request
     * 
     * @author romain
     */
    static class Request
    {
        String someText;

        Request(String someText)
        {
            this.someText = someText;
        }

        public String getSomeText()
        {
            return someText;
        }
    }

    /**
     * Creates a Thread that listens on a queue to process messages
     * 
     * @author romain
     */
    static class ServerThread implements Runnable
    {
        private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
        volatile boolean stop = false;

        /**
         * Does all the work
         */
        @Override
        public void run()
        {
            ExecutorService pool = Executors.newFixedThreadPool(3);
            try
            {
                while (!stop)
                {
                    Request req = queue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (req != null)
                    {
                        Runnable runnable = new Executor.ImageProcessor(req);
                        pool.execute(runnable);
                    }
                }
            }
            catch (InterruptedException ie)
            {
                System.out.println("Log something here");
            }
            finally
            {
                pool.shutdown();
            }
        }

        /**
         * Accepts a message on the queue
         * @param request
         */
        public void accept(Request request)
        {
            queue.add(request);
        }

        public void stopProcessing()
        {
            stop = true;
        }
    }

    /**
     * class to do the actual work
     * @author romain
     */
    static class ImageProcessor implements Runnable
    {
        String someText;

        ImageProcessor(Request req)
        {
            this.someText = req.getSomeText();
        }

        @Override
        public void run()
        {
            System.out.println(someText);
            // Process Image here
        }
    }

    /**
     * Test Harness
     * @param args
     */
    public static void main(String[] args)
    {
        // Initialize 
        ServerThread processor = new ServerThread();
        Thread aThread = new Thread(processor);
        aThread.start();

        // Wait for Thread to start
        try
        {
            Thread.sleep(500L);
        }
        catch (InterruptedException e1)
        {
            e1.printStackTrace();
        }

        for (int i = 0; i < 100; i++)
        {
            String text = "" + i;
            Request aRequest = new Request(text);
            processor.accept(aRequest);
        }

        // Give it enough time to finish
        try
        {
            Thread.sleep(500L);
        }
        catch (InterruptedException e1)
        {
            e1.printStackTrace();
        }

        // Tell the thread to finish processing
        processor.stopProcessing();

        // Wait for the Thread to complete
        try
        {
            aThread.join();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
Romain Hippeau
At minimum, field `ServerThread#stop` needs to be declared *volatile*.
seh
@seh Not really, I only have one Thread that can Write to it in my example and one Thread that can read from it. I do not care about Thread safety nor if the it is cached for a period of time. A boolean value is atomic when being read or written anyways.
Romain Hippeau
The important part of your position is not caring if "[the boolean field] is cached for a period of time". It could be cached -- and hence not visible to the thread you'd like to "interrupt" -- forever. While assigning the boolean field may happen "atomically" -- in the sense that no reader could see a "partial value" -- the *propagation* of that write across thread boundaries is subject to much leeway, and you should not present an example depending on that propagation without noting or eliminating the vulnerability.
seh
@seh - I added volatile since you feel that strongly about it :)
Romain Hippeau
A: 

Here is some code that I call at the end of my main() in some programs. If the user types "quit" on the command line then the program cleans up and then exits. Or else you can modify where if the user enters "action" to do something else.

   public void serve() throws IOException
   {
      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
      PrintWriter out = new PrintWriter(new OutputStreamWriter(System.out));
      String line = null;

      for (;;)
      {
         out.print("> ");
         out.flush();
         line = in.readLine();
         if (line == null)
         {
            break;                  // QUIT if we get EOF
         }

         try
         {
            // Use a stringTokenizer to parse the user's command
            StringTokenizer t = new StringTokenizer(line);

            // blank line
            if (!t.hasMoreTokens())
            {
               continue;
            }

            // get the first word of the input and convert to lower case
            String command = t.nextToken().toLowerCase();

            if (command.equals("quit"))
            {
               bTerminate = true;
               // Do all cleanup here
               myClient.close();
               break;
            }
            else if (command.equals("action"))
            {
               if (line.length() > command.length())
               {
                  // get data from rest of line
                  String data = line.substring(command.length()).trim();
                  // perform action
                  myOutputStream.writeUTF(data);
               }
            }
         }
         catch(Exception e)
         {
            e.printStackTrace();
         }
      }
      out.close();
      in.close();
   }
Romain Hippeau