views:

58

answers:

3

Hi Folks,

I am supposed to process images in a multithreaded mode using Java. I may having varying number of images where as my number of threads are fixed. I have to process all the images using the fixed set of threads.

I am just stuck up on how to do it, I had a look ThreadExecutor and BlockingQueues etc...I am still not clear. What I am doing is, - Get the images and add them in a LinkedBlockingQueue which has runnable code of the image processor. - Create a threadpoolexecutor for which one of the arguements is the LinkedBlockingQueue earlier. - Iterate through a for loop till the queue size and do a threadpoolexecutor.execute(linkedblockingqueue.poll). - all i see is it processes only 100 images which is the minimum thread size passed in LinkedBlockingQueue size.

I see I am seriously wrong in my understanding somewhere, how do I process all the images in sets of 100(threads) until they are all done? Any examples or psuedocodes would be highly helpful

Thanks! J

A: 

Sun's tutorials is really good so i will just post the link Defining and Starting a Thread

Quote: Threads are sometimes called lightweight processes. Both processes and threads provide an execution environment, but creating a new thread requires fewer resources than creating a new process. Threads exist within a process — every process has at least one. Threads share the process's resources, including memory and open files. This makes for efficient, but potentially problematic, communication.

while(que is not empty)
  start new set of image-processing-thread
Joelmob
A: 

You can think of each processing operation being a 'task'. Place these tasks in a single queue, and have each thread consuming a task from this thread each time they complete a task.

Will
A: 

Here is a sample class that I wrote. The whole thing runs standalone and prints a number from 1 to 100 each from a ThreadPool. Pretty much all you need to do is update the Request class to pass in what you want and to re-implement ImageProcessor.

package com.rch.test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Executor
{
    /**
     * Class to encapsulate a request
     * 
     * @author romain
     */
    static class Request
    {
        String someText;

        Request(String someText)
        {
            this.someText = someText;
        }

        public String getSomeText()
        {
            return someText;
        }
    }

    /**
     * Creates a Thread that listens on a queue to process messages
     * 
     * @author romain
     */
    static class ServerThread implements Runnable
    {
        private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
        boolean stop = false;

        /**
         * Does all the work
         */
        @Override
        public void run()
        {
            ExecutorService pool = Executors.newFixedThreadPool(3);
            try
            {
                while (!stop)
                {
                    Request req = queue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (req != null)
                    {
                        Runnable runnable = new Executor.ImageProcessor(req);
                        pool.execute(runnable);
                    }
                }
            }
            catch (InterruptedException ie)
            {
                System.out.println("Log something here");
            }
            finally
            {
                pool.shutdown();
            }
        }

        /**
         * Accepts a message on the queue
         * @param request
         */
        public void accept(Request request)
        {
            queue.add(request);
        }

        public void stopProcessing()
        {
            stop = true;
        }
    }

    /**
     * class to do the actual work
     * @author romain
     */
    static class ImageProcessor implements Runnable
    {
        String someText;

        ImageProcessor(Request req)
        {
            this.someText = req.getSomeText();
        }

        @Override
        public void run()
        {
            System.out.println(someText);
            // Process Image here
        }
    }

    /**
     * Test Harness
     * @param args
     */
    public static void main(String[] args)
    {
        // Initialize 
        ServerThread processor = new ServerThread();
        Thread aThread = new Thread(processor);
        aThread.start();

        // Wait for Thread to start
        try
        {
            Thread.sleep(500L);
        }
        catch (InterruptedException e1)
        {
            e1.printStackTrace();
        }

        for (int i = 0; i < 100; i++)
        {
            String text = "" + i;
            Request aRequest = new Request(text);
            processor.accept(aRequest);
        }

        // Give it enough time to finish
        try
        {
            Thread.sleep(500L);
        }
        catch (InterruptedException e1)
        {
            e1.printStackTrace();
        }

        // Tell the thread to finish processing
        processor.stopProcessing();

        // Wait for the Thread to complete
        try
        {
            aThread.join();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
Romain Hippeau