views:

251

answers:

4

So this is a continuation from my last question - So the question was "What is the best way to build a program that is thread safe in terms that it needs to write double values to a file. If the function that saves the values via streamwriter is being called by multiple threads? Whats the best way of doing it?"

And I modified some code found at MSDN, how about the following? This one correctly writes everything to the file.

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}
+2  A: 

The code you have there is subtly broken - in particular, if the queued work item runs first, then it will flush the (empty) list of values immediately, before terminating, after which point your worker goes and fills up the List (which will end up being ignored). The auto-reset event also does nothing, since nothing ever queries or waits on its state.

Also, since each thread uses a different lock, the locks have no meaning! You need to make sure you hold a single, shared lock whenever accessing the streamwriter. You don't need a lock between the flushing code and the generation code; you just need to make sure the flush runs after the generation finishes.

You're probably on the right track, though - although I'd use a fixed-size array instead of a list, and flush all entries from the array when it gets full. This avoids the possibility of running out of memory if the thread is long-lived.

bdonlan
How come the flushing will happen if the list is empty?
Wajih
I explained this in my answer - the queued work item `WorkMethod` can run _before_ the thread `ThreadMethod`. And it can also run after. You can't predict which, because you haven't set any kind of explicit ordering here.
bdonlan
+4  A: 

Thread and QueueUserWorkItem are the lowest available APIs for threading. I wouldn't use them unless I absolutely, finally, had no other choice. Try the Task class for a much higher-level abstraction. For details, see my recent blog post on the subject.

You can also use BlockingCollection<double> as a proper producer/consumer queue instead of trying to build one by hand with the lowest available APIs for synchronization.

Reinventing these wheels correctly is surprisingly difficult. I highly recommend using the classes designed for this type of need (Task and BlockingCollection, to be specific). They are built-in to the .NET 4.0 framework and are available as an add-on for .NET 3.5.

Stephen Cleary
+1  A: 
  • the code has the writer as an instance var but using a static locker. If you had multiple instances writing to different files, there's no reason they would need to share the same lock
  • on a related note, since you already have the writer (as a private instance var), you can use that for locking instead of using a separate locker object in this case - that makes things a little simpler.

The 'right answer' really depends on what you're looking for in terms of locking/blocking behavior. For instance, the simplest thing would be to skip the intermediate data structure just have a WriteValues method such that each thread 'reporting' its results goes ahead and writes them to the file. Something like:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

Of course, this means worker threads serialize during their 'report results' phases - depending on the performance characteristics, that may be just fine though (5 minutes to generate, 500ms to write, for example).

On the other end of the spectrum, you'd have the worker threads write to a data structure. If you're in .NET 4, I'd recommend just using a ConcurrentQueue rather than doing that locking yourself.

Also, you may want to do the file i/o in bigger batches than those being reported by the worker threads, so you might choose to just do writing in a background thread on some frequency. That end of the spectrum looks something like the below (you'd remove the Console.WriteLine calls in real code, those are just there so you can see it working in action)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushFrequencyInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushFrequency = TimeSpan.FromSeconds(flushFrequencyInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushFrequency, flushFrequency);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
James Manning
+2  A: 

So you're saying you want a bunch of threads to write data to a single file using a StreamWriter? Easy. Just lock the StreamWriter object.

The code here will create 5 threads. Each thread will perform 5 "actions," and at the end of each action it will write 5 lines to a file named "file."

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

The result should be a file "file" with 125 lines in it with all "actions" performed concurrently and the result of each action written synchronously to the file.

Lunatic Experimentalist
You shouldn't be locking objects where you don't control their implementation like that - what if it internally locks itself in another thread? - you should create a new `Object` to use as a lock.
bdonlan
Do not use a separate object for a lock. Locking an object directly is the only way to ensure that all threads that can see an object can get an exclusive lock on the object.
Lunatic Experimentalist
Pretty cool, exactly what I wanted, thanks indeed!!!
Wajih