views:

683

answers:

5

Consider the following shell script:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

This has three processes working in parallel to decompress a stream, modify it, and re-compress it. Running time I can see my user time is about twice that of my real time, which indicates the program is effectively working in parallel.

I've attempted to create the same program in Java by placing each task in it's own thread. Unfortunately, the multithreaded Java program is only about 30% faster than the single threaded version for the above sample. I've tried using both an Exchanger and a ConcurrentLinkedQueue. The ConcurrentLinkedQueue linked queue causes a lot of contention, although all three threads are generally kept busy. The Exchanger has lower contention, but is more complicated, and the doesn't seem to keep the slowest worker running 100% of the time.

I'm trying to figure out a pure Java solution to this problem without looking at one of the byte code weaving frameworks or a JNI based MPI.

Most of the concurrency research and APIs concern themselves with divide-and-conquer algorithms, giving each node work which is orthogonal and non-dependent on prior calculations. Another approach to concurrency is the pipeline approach, where each worker does some work and passes the data onto the next worker.

I'm not trying to find the most efficient way to sed a gzip'd file, but rather I'm looking at how to efficiently break down tasks in a pipeline, in order to reduce the runtime to that of the slowest task.

Current timings for a 10m line file are as follows:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

I'm offering a bounty for a 10% improvement in Java, as measured by real time on a four core system with 10m rows of test data. Current sources are available on Bitbucket.

+11  A: 

Firstly, the process will only be as fast as the slowest piece. If the timing breakdown is:

  • gunzip: 1 seconds
  • sed: 5 seconds
  • gzip: 1 second

by going multi-threaded you'll be done in at best 5 seconds instead of 7.

Secondly, rather than using the queues you're using, instead try to replicate the functionality of what you're copying and use PipedInputStream and PipedOutputStream to chain together processes.

Edit: there are a few ways of handling related tasks with Java concurrency utils. Divide it into threads. Firstly create a common base class:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

What this interface does is represent some arbitrary job that process input and generates output. Chain these together and you have a pipeline. You can abstract away the boilerplate too. For this we need a class:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

So, for example, the Unzip PART:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

and so on for Sed and Zip. What then binds it together is this:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

I'm not sure you can do much better than that and there is minimal code to write for each job. Then your code becomes:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}
cletus
on some tests i did on multithreaded encryption I did something very similar, but the real increase in performance came when I did my own buffering implementation. piped streams and buffered streams are already buffered, but buffer size introduces lots of overhead unless buffers are aligned to whatever the transform algorithm is. so, if zip works on 1k byte at time, use a custom buffer of that dimension to feed up the data before the zipping, if sed uses a line fully fetch 128 lines at a time, this sort of stuff greatly increase speed reducing contention and overhead (and complexity, otoh...).
Lorenzo Boccaccia
+2  A: 

You can use pipes in Java, too. They are implemented as Streams, see PipedInputStream and PipedOutputStream for more details.

To prevent blocking, I would recommend to put a propper pipe size.

Hardcoded
The PipedOutputStream and PipedInputStream are useful when you need to connect an output stream to an input steam. The 'Sed' class in the Serial test is effectively doing what the above two classes would do. I'm not trying to achieve pipes, but rather concurrent pipelines. Pipelines are like an car assembly line, passing work from one stage to the next. Running each stage in it's own thread offers the possibility for concurrency in tasks which can't be run in parallel.
brianegge
Besides; I just tested the performance of PidedIOStream, and its actually just a semaphore, and only one thread at a time can work on the underlying buffer. I guess it would be possible to rewrite the classes to use more than one buffer though, and increase the throughput.
KarlP
+3  A: 

Given that you are not saying how you are measuring the elapsed time, I am assuming that you are using something like:

time java org.egge.concurrent.SerialTest < in.gz > out.gz
time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz

The problem with this is that you are measuring two things here:

  1. How long the JVM takes to start up, and
  2. How long the program takes to run.

You can only change the second one with your code changes. Using the figures that you gave:

Testing SerialTest
real    0m6.736s
user    0m6.924s
sys     0m0.245s

Testing ParallelExchangerTest
real    0m4.967s
user    0m7.491s
sys     0m0.850s

If we assume that the JVM startup takes three seconds, then the "program run time" is 3.7 and 1.9 seconds respectively, this is pretty much a 100% speedup. I would strongly suggest that you use a larger dataset to test with so that you can minimise the impact of the startup of the JVM on your timing results.

Edit: Based on your answers to this question, you may well be suffering from lock contention. The best way to resolve that in java is probably to use the piped readers and writers, read from the pipes, byte at a time, and replace any '@' characters in the input stream with a "_at_" in the output stream. You might be suffering from the fact that each string is scanned three times, and any replacement requires a new object to be built, and the string ends up getting copied again. Hope this helps...

Paul Wagland
If I run the test with a single record, and I can see real times of 0m0.292s. The test isn't ideal as the last stage is more than twice as intensive as the fist two.
brianegge
The test harness I'm using is in http://bitbucket.org/brianegge/java-concurrent/src/tip/bin/test. Running 10m rows, the ParallelExchanger shows a 'user' time almost the same as the shell script, but a real time which is 10 seconds longer. If I can improve the efficiency, it may be able to execute in the same time as the shell script.
brianegge
+5  A: 

I individually verified the time taken, it seem like reading takes less than 10% of the time,and reading plus processing takes less than 30% of the whole time. So I took ParallelExchangerTest (best performer in your code) and modified it to just have 2 thread, first thread does reading & replace, and second thread does the writing.

Here are the figures to compare (on my machine Intel dual core (not core2) running ubuntu with 1gb ram)

> Testing via shell

real 0m41.601s

user 0m58.604s

sys 0m1.032s

> Testing ParallelExchangerTest

real 1m55.424s

user 2m14.160s

sys 0m4.768s

> ParallelExchangerTestMod (2 thread)

real 1m35.524s

user 1m55.319s

sys 0m3.580s

I knew that string processing takes longer time so I replace line.repalce with matcher.replaceAll, I got this figures

> ParallelExchangerTestMod_Regex (2 thread)

real 1m12.781s

user 1m33.382s

sys 0m2.916s

Now I took a step ahead, instead of reading one line at a time, I read char[] buffer of various sizes and timed it, (with the regexp search/replace,) I got these figures

> Testing ParallelExchangerTestMod_Regex_Buff (100 bytes processing at a time)

real 1m13.804s

user 1m32.494s

sys 0m2.676s

> Testing ParallelExchangerTestMod_Regex_Buff (500 bytes processing at time)

real 1m6.286s

user 1m29.334s

sys 0m2.324s

> Testing ParallelExchangerTestMod_Regex_Buff (800 bytes processing at time)

real 1m12.309s

user 1m33.910s

sys 0m2.476s

Looks like 500 bytes is optimal for the size of data.

I forked and have a copy of my changes here

https://bitbucket.org/chinmaya/java-concurrent_response/

chinmaya
I checked out your changes and ran them on a Solaris machine. The results were quite a bit different from Ubuntu. The fastest one ran 1.5 seconds faster than my ParallelExchangerTest. ParallelExchangerTestMod_Regexreal 0m40.418suser 0m56.314ssys 0m1.374sRunning the same test on Ubuntu, Cygwin, and OS X shows the results vary quite a bit from platform to platform.
brianegge
certainly, the JVM implementation changes from platform to platform.You may want to try optimization at compile time (javac -O) and runtime (java -X) .
chinmaya
A: 

Reducing the number of reads and objects give me more than 10% better performance.

But the performance of java.util.concurrent is still a bit disappointing.

ConcurrentQueueTest:

private static class Reader implements Runnable {

@Override
  public void run() {
   final char buf[] = new char[8192];
   try {

    int len;
    while ((len = reader.read(buf)) != -1) {
     pipe.put(new String(buf,0,len));
    }
    pipe.put(POISON);

   } catch (IOException e) {
    throw new RuntimeException(e);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }
KarlP