views:

86

answers:

3

Hi,

I'm trying to write a simple video manipulator, so several times a second I need to start a new thread (currently implementing Runnable) to process the current frame but I have no guarantee how long each thread will take to finish and as such I want to limit the number of threads that can run at once to the number of processors on the computer:

Runtime runtime = Runtime.getRuntime();
int nP = runtime.availableProcessors();  

But I need to guarantee that all threads that are created are run in sequence so no frames are dropped.

I would also like to show the user how long it will take to finish processing, based on the number of threads left to run when they cancel the job so that they don't end up with a video file with no trailer.

Would this be possible using any combination of futureTask, Exector or ExecutorService?

Thanks.

EDIT:

Hi guys, sorry yeah that was rather badly phrased. So what I'm actually trying to do is get the frame, perform some image manipulation and then save the editted footage back out to a new file. At the moment I'm doing this during playback so each frame is manipulated when it's called by a timer, the timer then launches a thread to process the image as quickly as possible but depending on the number of manipulations being done this time will vary.

I was then wanting to make sure that if the processing takes longer than the interval that only the maximum efficient number of threads are used for processing and that any threads that are created after this limit is reached are still dealt with and not dropped or garbage collected.

Having read the first 3 comments I can see this is probably a less efficient way of doing it and I guess having the one thread just to keep the UI responsive would work but I'm not sure how to keep adding the images to the thread for it to process without using a huge list. I'm assuming it'll be something like:

In the main class:

Timer actionPerformed {
    List.add(decodedImage);
}

In the runnable class:

run() {
   while( timer.isRunning() ) {
     if( runCount >= list.size()-1 ) {
        try {
          Thread.sleep(500);
        } catch() {
             /* Catchy stuff */
        }
     } else {
        BufferedImage toProcess = list.get(runCount);
        /* Do Processing here */
        writeImageToStream();
        list.remove(runCount);
        runCount++;
     }
   }
}

Is this correct?

EDIT 2:

So this is what I have so far:

public class timerEncode {

     private long startTime;

     ActionListener goAction = new ActionListener() {
         public void actionPerformed( ActionEvent evt ) {
             BufferedImage decoded = getNextImage();
             long write_time = System.nanoTime();
             new doImages(decoded, write_time).run();
         }        
     };
     Timer goTimer = new Timer(40,goAction);

     private BufferedImage getNextImage() {
        /* Does inconsequential stuff to retrieve image from the stream*/
     }

     private void recBtnActionPerformed(java.awt.event.ActionEvent evt) {                                       
        startTime = System.nanoTime();
        goTimer.start();
     }

     private class doImages implements Runnable {
        final BufferedImage image;
        final long write_time;

        public doImages(BufferedImage image, long write_time) {
           this.image = image;
           this.write_time = write_time;
        }

        public void run() {
            BufferedImage out = toXuggleType(image, BufferedImage.TYPE_3BYTE_BGR);
            /* Other time consuming processy stuff goes here */
            /* Encode the frame to a video stream */
            writer.encodeVideo(0,out,write_time-startTime, TimeUnit.NANOSECONDS);
        }

        private BufferedImage toType(BufferedImage source, int type) {
            if( source.getType() != type ) {
                BufferedImage temp = new BufferedImage(source.getWidth(),source.getHeight(),type);
                temp.getGraphics().drawImage(source, 0, 0, null);
                source = temp;
            }
            return source;
        }
    }

}

This works fine when the image processing is simple but you soon run into tens of concurrent threads trying to do their thing as it gets a bit more complex hence why I was asking how to limit the concurrent number of threads without dropping any. I'm not sure order particularly matters in this case as I think writing the frames out of order will put them in the correct place as the write time is specified with each frame but this needs testing.

+1  A: 

It is a bad idea to be launching threads continuously - it's a big performance hit. What you want is a thread pool and a bunch of jobs to do (the Runnables). If you create a Thread pool of size = number of processors and just keep adding frames (as Jobs) to a job queue, your Threads will be able to efficiently process their way through the queue in order.

jowierun
Thanks, I've taken your advice on board about continuously creating new threads and tried to improve the solution in the original post.
drent
A: 

[see history for previous responses]

I see what's going on now. I'm not completely sure about how Timer or ActionListeners work (with respect to what happens if the previous call hasn't finished when another one arrives), but it seems that you may not actually be running your doImages objects concurrently - to run a Runnable object concurrently you need to do Thread t = new Thread(runnableObject); t.start(); If you just call the run() method, it'll be done sequentially (like any other method call) so your actionPerformed() method won't finish until run() does. I'm not sure if this will prevent (or delay) other ActionEvents being handled.

As has been suggested by others, to cap the number of threads, you should use a ThreadPoolExcecutor object. This will make your actionPerformed() method return quickly, run doImages objects concurrently and ensure you don't use too many threads through a queue. All you'd need to do is replace new doImages(decoded, write_time).run(); with threadPool.execute(new doImages(decoded, write_time)).

As to how to monitor the process, you could use the getQueue() method of ThreadPoolExcecutor to retrieve and examine the size of the queue to see how many frames are waiting to be processed.

Scott
Hi, Thanks. I've editted the original post for clarity.
drent
Hi, Updated with a cut down example of what I did originally
drent
+4  A: 

But I need to guarantee that all threads that are created are run in sequence so no frames are dropped.

Do you mean this as you worded it here? If so, then you can't really multithread this at all, as I understand that you can't start processing frame 2 until frame 1 has finished. At which point, you may as well process the frames sequentially and ignore threading.

Alternatively if you mean something else, such as that the frames can be processed independently but need to be collated in sequence, then this could be doable.

In any case - resorting to using "raw" threads is rarely required or beneficial. As others have pointed out, use the higher-level concurrency utilities (in this case a ThreadPoolExecutor would be perfect) to oversee this.

It sounds like a Runnable isn't quite the right choice either, as that implies you're returning the "result" of the processing by mutating some global variable. Instead it may be better to turn this processing into a Callable and return the result. This would likely remove thread-safety issues, might allow different frames to be processed at once with less problems and would allow you to defer the collation of each result until whatever point you deemed fit.

If you wanted to go this way, you could do something like the following:

// Create a thread pool with the given concurrency level
ExecutorService executor = Executors.newFixedThreadPool(Runtime.availableProcessors);

// Submit all tasks to the pool, storing the futures for further reference
// The ? here should be the class of object returned by your Callables
List<Future<?>> futures = new ArrayList<Future<?>>(NUM_FRAMES);
for (int i = 0; i < NUM_FRAMES; i++)
{
    futures.add(executor.submit(createCallableForFrame(i)));
}

// Combine results using future.get()
// e.g. do something with frames 2 and 3:
mergeFrames(futures.get(2).get(), futures.get(3).get());

// In practice you'd probably iterate through the Futures but it's your call!
Andrzej Doyle
The forkjoin frame work offered in java 7 does what you are suggesting, but I like the parallelism!
John V.