views:

352

answers:

7

There is a folder that contains 1000 of small text files.

I aim to parse and process all of them while more files are being populated in to the folder.

My intention is to multithread this operation as the single threaded prototype took 6 minutes to process 1000 files.

I like to have reader and writer thread(s) as following : while the reader thread(s) are reading the files, I d like to have writer thread(s) to process them. Once the reader is started reading a file, I d like to mark it as being processed, such as by renaming it, once it s read, rename it to completed.

How to approach such multithreaded application ?

Is it better to use a distributed hash table or a queue?

Which data structure to use that would avoid locks?

Would you have a better approach to this scheme that you like to share?

+1  A: 

You could have a central queue, the reader threads would need write access during the push of the in-memory contents to the queue. The processing threads would need read access to this central queue to pop off the next memory stream to-be-processed. This way you minimize the time spent in locks and don't have to deal with the complexities of lock free code.

EDIT: Ideally, you'd handle all exceptions/error conditions (if any) gracefully, so you don't have points of failure.

As an alternative, you can have multiple threads, each one "claims" a file by renaming it before processing, thus the filesystem becomes the implementation for locked access. No clue if this is any more performant than my original answer, only testing would tell.

Chris O
That introduces a single point of failure. I d like a decentralized approach.
Possibly, but having each thread be "intelligent" and work together with the other threads can be complicated. Cross-threading issues can be a nightmare to debug. A single (or central) queue is simpler.
Pretzel
What do you mean i dont have points of failure ? what happens if the server goes down ? centralized approach is always introduces single point of failure.
A: 

Generally speaking, 1000 small files (how small, btw?) should not take six minutes to process. As a quick test, do a find "foobar" * in the directory containing the files (the first argument in quotes doesn't matter; it can be anything) and see how long it takes to process every file. If it takes more than one second, I'll be disappointed.

Assuming this test confirms my suspicion, then the process is CPU-bound, and you'll get no improvement from separating the reading into its own thread. You should:

  1. Figure out why it takes more than 350 ms, on average, to process a small input, and hopefully improve the algorithm.
  2. If there's no way to speed up the algorithm and you have a multicore machine (almost everyone does, these days), use a thread pool to assign 1000 tasks each the job of reading one file.
Marcelo Cantos
:)) "took 6 minutes to process 1000 files." not just reading the files. Since this will be a continuous process, i m interested the overall running time.
@user177883: Firstly, a continuous process doesn't have an overall running time. Secondly, I am well aware that the six minutes is total processing time. The whole point of the test is to show how much of that time is I/O; I strongly suspect that almost none of it is, in which case there is nothing to be gained by parallelising I/O.
Marcelo Cantos
I guess i should have been more precise with what i meant by continuous process. I meant, this process will be running as long as the files are being created.
Did you run the test I suggested? I'm curious to know the result.
Marcelo Cantos
I will let you know once i do it. i dont have access to the files right now.
A: 

I realize that this isn't a full/complete answer, but I just wrote something similar to this today and know sort of what this entails. Here's my description of an approach on how to do this.

  • Thread #1 is your primary thread.
  • Thread #2 is FileSystemWatcher object. Set it to monitor a Folder and the "OnCreated" event. Have the FileSystemWatcher "enqueue" files to a Queue object. From my testing, it appears to be thread safe. (You'll want to check MSDN, though.)
  • Thread #3: I decided to create only 1 more thread after that, a main processing thread that Dequeued filenames one at a time as it processed them. (You still need to check if the Queue count is greater than 0 or you'll throw an exception.)

(yes, as some people pointed out, this is a Producer/Consumer pattern.)

I did briefly experiment with multi-threading, but found it to be unnecessary for what I was processing. I also had a progress bar and didn't want multiple progress bars for each thread.

That said, my experiments with multi-threading did work!

Pretzel
GUI? I didnt mention anything about GUI.
True enough. Sorry about that. Well, Thread #1 is your master control thread, then. (no GUI.) Also, be sure to check MSDN docs on thread-safeness of objects. You'll likely have to create Callbacks and Events and Delegates to get data safely between threads. I also used Control.Invoke or Control.Owner.Invoke (where Control is any of the various controls) to get data to that control's proper thread owner and found it quite effective. Though again, you're not using a GUI. ha! I was only describing my own work.
Pretzel
If someone hasn't posted a fuller answer by tomorrow, I'll post my prototype code tomorrow morning. (It's at work.) My prototype code had no GUI, so perhaps you'll like it better. :-)
Pretzel
A: 

You might consider a queue of files to process. Populate the queue once by scanning the directory when you start and have the queue updated with a FileSystemWatcher to efficiently add new files to the queue without constantly re-scanning the directory.

If at all possible, read and write to different physical disks. That will give you maximum IO performance.

If you have an initial burst of many files to process and then an uneven pace of new files being added and this all happens on the same disk (read/write), you could consider buffering the processed files to memory until one of two conditions applies:

  • There are (temporarily) no new files
  • You have buffered so many files that you don't want to use more memory for buffering (ideally a configurable threshold)

If your actual processing of the files is CPU intensive, you could consider having one processing thread per CPU core. However, for "normal" processing CPU time will be trivial compared to IO time and the complexity would not be worth any minor gains.

Eric J.
Writing will be directed to a database server.
In that case a read thread and write thread will improve throughput, since the write happens to the network and not the local drive.
Eric J.
+1  A: 

Design

The Producer/Consumer pattern will probably be the most useful for this situation. You should create enough threads to maximize the throughput.

Here are some questions about the Producer/Consumer pattern to give you an idea of how it works:

You should use a blocking queue and the producer should add files to the queue while the consumers process the files from the queue. The blocking queue requires no locking, so it's about the most efficient way to solve your problem.

If you're using .NET 4.0 there are several concurrent collections that you can use out of the box:

Threading

A single producer thread will probably be the most efficient way to load the files from disk and push them onto the queue; subsequently multiple consumers will be popping items off the queue and they'll process them. I would suggest that you try 2-4 consumer threads per core and take some performance measurements to determine which is most optimal (i.e. the number of threads that provide you with the maximum throughput). I would not recommend the use a ThreadPool for this specific example.

P.S. I don't understand what's the concern with a single point of failure and the use of distributed hash tables? I know DHTs sound like a really cool thing to use, but I would try the conventional methods first unless you have a specific problem in mind that you're trying to solve.

Lirik
I recon the producer should also read the files into a buffer that is then passed onto the consumers. This way you won't have multiple threads trying to read from the HDD at the same time (which will cause the head to jump a lot and slow it down a LOT, rather than speed up the process).
Grant Peters
Correct Grant, one producer and multiple consumers.
Lirik
+5  A: 

Since there's curiosity on how .Net 4 works with this in comments, here's that approach, sorry it's likely not an option for the OP. Disclaimer: This is not a highly scientific analysis, just showing that there's a clear performance benefit, based on hardware your mileage may very widely.

Here's a quick test (if you see a big mistake in this simple test, it's just an example, please comment and we can fix it to be more useful/accurate). For this, I just dropped 12,000 ~60kb files into a directory as a sample (fire up Linqpad you can play with it yourself, for free! - be sure to get LinqPad 4 though):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Slightly changing you loop to parallelize the query is all that's needed in most simple situations, by "simple" I mostly mean that the result of one action doesn't affect the next. Something to keep in mind most often is that some collections, for example our handy List<T> is not thread safe, so using it in a parallel scenario isn't a good idea :) Luckily there were concurrent collections added in .Net 4 that are thread safe. Also keep in mind if you're using a locking collection, this may be a bottleneck as well, depending on the situation.

This uses the .AsParallel<T>(IEnumeable<T>) and .ForAll<T>(ParallelQuery<T>) extensions available in 4.0. The .AsParallel() call wraps the IEnumerable<T> in a ParallelEnumerableWrapper<T> (internal class) which implements ParallelQuery<T>. This now allows you to use the parallel extension methods, in this case we're using .ForAll().

.ForAll() internally crates a ForAllOperator<T>(query, action) and runs it synchronously. This handles the threading and merging of the threads after it's running...there's quite a bit going on in there, I'd suggest starting here if you want to learn more, including additional options.


The results (Computer 1 - Physical Hard Disk):

  • Serial: 1288 - 1333ms
  • Parallel: 461 - 503ms

Computer specs - for comparison:

The results (Computer 2 - Solid State Drive):

  • Serial: 545 - 601ms
  • Parallel: 248 - 278ms

Computer specs - for comparison:

  • Quad Core 2 Quad Q9100 @ 2.26GHz
  • 8GB RAM (DDR 1333)
  • 120GB OCZ Vertex SSD (Standard Version - 1.4 Firmware)

I don't have links for the CPU/RAM this time, these came installed, this is a Dell M6400 Laptop (here's a link to the M6500...dell's own links to the 6400 are broken)


These numbers are from 10 runs, taking the min/max of the inner 8 results (removing the original min/max for each as possible outliers). We hit a I/O bottleneck here, especially on the physical drive, but think about what the serial method does, it reads, processes, reads, processes, rinse repeat. With the parallel approach, you are (even with a I/O bottleneck) reading and processing simultaneously, in the worst bottleneck situation, you're processing one file while reading the next, that alone (on any current computer!) should result in some performance gain. You can see that we can get a bit more than 1 going at a time in the results above, giving us a healthy boost.

Another disclaimer: Quad core + .Net 4 parallel isn't going to give you 4x performance, it doesn't scale linearly...there are other considerations and bottlenecks in play.

Hope this was on interest in showing the approach and possible benefits, feel free to criticize or improve...this answer exists solely for those curious as indicated in comments :)

Nick Craver
Great to see a concrete example of this. Thank you.
Chris
Nick, wouldn't it be better if the OP uses a producer and several consumers, instead of parallel tasks for each file? If the OP makes too many parallel tasks, then switching between them will actually degrade performance... great post otherwise!
Lirik
@Lirik - Not sure I understand completely, you're not really context switching here, this creates the number of threads corresponding to the number of cores, so you're not context switching, except during an interruption like always. What would be the producer in your case (clarify a bit, example!)? Since the ability to scale the file reading depends on the data source, whether it's one physical HD, fibre channel, an SSD, RAM, etc...it's ability to read x number of files faster would depend on the medium...so not sure a single producer is faster...it might actually become a bottleneck :)
Nick Craver
@Nick, I'm trying to figure out how your example creates a number of threads corresponding to the cores? Is it automatic with TPL, or is there some other magic there?
Lirik
@Lirik - The number of workers is (by default) automatically scaled by PLINQ internally, though you can specify a limit (and lots of other options) if you want using MaxDegreeOfParallelism, Reed Copsey has a good explanation here: http://reedcopsey.com/2010/02/11/parallelism-in-net-part-9-configuration-in-plinq-and-tpl/
Nick Craver
+1 that's great to know, good stuff!
Lirik
A: 

I recommend that you queue a thread for each file and keep track of the running threads in a dictionary, launching a new thread when a thread completes, up to a maximum limit. I prefer to create my own threads when they can be long-running, and use callbacks to signal when they're done or encountered an exception. In the sample below I use a dictionary to keep track of the running worker instances. This way I can call into an instance if I want to stop work early. Callbacks can also be used to update a UI with progress and throughput. You can also dynamically throttle the running thread limit for added points.

The example code is an abbreviated demonstrator, but it does run.

class Program { static void Main(string[] args) { Supervisor super = new Supervisor(); super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { get { lock (locker) { return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); } } }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}
ebpower