views:

396

answers:

4

Hi there, folks.

Here I am again with questions about multi-threading and an exercise of my Concurrent Programming class.

I have a multi-threaded server - implemented using .NET Asynchronous Programming Model - with GET (download) and PUT (upload) file services. This part is done and tested.

It happens that the statement of the problem says this server must have logging activity with the minimum impact on the server response time, and it should be supported by a low priority thread - logger thread - created for this effect. All logging messages shall be passed by the threads that produce them to this logger thread, using a communication mechanism that may not lock the thread that invokes it (besides the necessary locking to ensure mutual exclusion) and assuming that some logging messages may be ignored.

Here is my solution, that I would like to help your help validating as a solution to the stated problem:

using System;
using System.IO;
using System.Threading;

// Multi-threaded Logger
public class Logger {
    // textwriter to use as logging output
    protected readonly TextWriter _output;
    // logger thread
    protected Thread _loggerThread;
    // logger thread wait timeout
    protected int _timeOut = 500; //500ms
    // amount of log requests attended
    protected volatile int reqNr = 0;
    // logging queue
    protected readonly object[] _queue;
    protected struct LogObj {
        public DateTime _start;
        public string _msg;
        public LogObj(string msg) {
            _start = DateTime.Now;
            _msg = msg;
        }
        public LogObj(DateTime start, string msg) {
            _start = start;
            _msg = msg;
        }
        public override string ToString() {
            return String.Format("{0}: {1}", _start, _msg);
        }
    }

    public Logger(int dimension,TextWriter output) {
        /// initialize queue with parameterized dimension
        this._queue = new object[dimension];
        // initialize logging output
        this._output = output;
        // initialize logger thread
        Start();
    }
    public Logger() {
        // initialize queue with 10 positions
        this._queue = new object[10];
        // initialize logging output to use console output
        this._output = Console.Out;
        // initialize logger thread
        Start();
    }

    public void Log(string msg) {
        lock (this) {
            for (int i = 0; i < _queue.Length; i++) {
                // seek for the first available position on queue
                if (_queue[i] == null) {
                    // insert pending log into queue position
                    _queue[i] = new LogObj(DateTime.Now, msg);
                    // notify logger thread for a pending log on the queue
                    Monitor.Pulse(this);
                    break;
                }
                // if there aren't any available positions on logging queue, this
                // log is not considered and the thread returns
            }
        }
    }

    public void GetLog() {
        lock (this) {
            while(true) {
                for (int i = 0; i < _queue.Length; i++) {
                    // seek all occupied positions on queue (those who have logs)
                    if (_queue[i] != null) {
                        // log
                        LogObj obj = (LogObj)_queue[i];
                        // makes this position available
                        _queue[i] = null;
                        // print log into output stream
                        _output.WriteLine(String.Format("[Thread #{0} | {1}ms] {2}",
                                                        Thread.CurrentThread.ManagedThreadId,
                                                        DateTime.Now.Subtract(obj._start).TotalMilliseconds,
                                                        obj.ToString()));
                    }
                }
                // after printing all pending log's (or if there aren't any pending log's),
                // the thread waits until another log arrives
                //Monitor.Wait(this, _timeOut);
                Monitor.Wait(this);
            }
        }
    }

    // Starts logger thread activity
    public void Start() {
        // Create the thread object, passing in the Logger.Start method
        // via a ThreadStart delegate. This does not start the thread.
        _loggerThread = new Thread(this.GetLog);
        _loggerThread.Priority = ThreadPriority.Lowest;
        _loggerThread.Start();
    }

    // Stops logger thread activity
    public void Stop() {
        _loggerThread.Abort();
        _loggerThread = null;
    }

    // Increments number of attended log requests
    public void IncReq() { reqNr++; }

}

Basically, here are the main points of this code:

  1. Start a low priority thread that loops the logging queue and prints pending logs to the output. After this, the thread is suspended till new log arrives;
  2. When a log arrives, the logger thread is awaken and does it's work.

Is this solution thread-safe? I have been reading Producers-Consumers problem and solution algorithm, but in this problem although I have multiple producers, I only have one reader.

Thanks in advance for all your attention.

+1  A: 

Actually, you ARE introducing locking here. You have locking while pushing a log entry to the queue (Log method): if 10 threads simultaneously pushed 10 items into queue and woke up the Logger thread, then 11th thread will wait until the logger thread log all items...

If you want something really scalable - implement lock-free queue (example is below). With lock-free queue synchronization mechanism will be really straightaway (you can even use single wait handle for notifications).

If you won't manage to find lock-free queue implementation in the web, here is an idea how to do this: Use linked list for an implementation. Each node in linked list contains a value and a volatile reference to the next node. therefore for operations enqueue and dequeue you can use Interlocked.CompareExchange method. I hope, the idea is clear. If not - let me know and I'll provide more details.

Vitaliy Liptchinsky
Hi there. Thank you very much for your reply, but I would really like to avoid a solution that uses Interlocked mechanisms. It is said in the exercise statement that I can use locking, although only for mutual exclusion purposes. Can't I consider my solution's "locking" mechanism to fit this purpose, as I'm simply trying to ensure mutual exclusion accessing the logging queue?
XpiritO
@XpiritO: You cannot do any multiprocessor safe lock-free work without the Interlocked instructions (or their GCC equivalents). If you are using a mutex lock then you don't need Interlocked. But Vitaliy's suggestion was for building a lock-free queue.
Zan Lynx
Ok, if you do not have critical performance requirements - your code at first look looks thread-safe.
Vitaliy Liptchinsky
+3  A: 

It seems it should be working. Producers-Consumers shouldn't change greatly in case of single consumer. Little nitpicks:

  • acquiring lock may be an expensive operation (as @Vitaliy Lipchinsky says). I'd recommend to benchmark your logger against naive 'write-through' logger and logger using interlocked operations. Another alternative would be exchanging existing queue with empty one in GetLog and leaving critical section immediately. This way none of producers won't be blocked by long operations in consumers.

  • make LogObj reference type (class). There's no point in making it struct since you are boxing it anyway. or else make _queue field to be of type LogObj[] (that's better anyway).

  • make your thread background so that it won't prevent closing your program if Stop won't be called.

  • Flush your TextWriter. Or else you are risking to lose even those records that managed to fit queue (10 items is a bit small IMHO)

  • Implement IDisposable and/or finalizer. Your logger owns thread and text writer and those should be freed (and flushed - see above).

elder_george
Thanks for your considerations. Definitely going to change my code accordingly.
XpiritO
+1  A: 

Hey there. Had a quick look, and while it appears to be thread-safe, I don't believe it is particularly optimal. I would suggest a solution along these lines

NOTE: just read the other responses. What follows is a fairly optimal, optimistic locking solution based on your own. Major differences is locking on an internal class, minimizing 'critical sections', and providing graceful thread termination. If you want to avoid locking altogether, then you can try some of that volatile "non-locking" linked list stuff as @Vitaliy Lipchinsky suggests.

using System.Collections.Generic;
using System.Linq;
using System.Threading;

...

public class Logger
{
    // BEST PRACTICE: private synchronization object. 
    // lock on _syncRoot - you should have one for each critical
    // section - to avoid locking on public 'this' instance
    private readonly object _syncRoot = new object ();

    // synchronization device for stopping our log thread.
    // initialized to unsignaled state - when set to signaled
    // we stop!
    private readonly AutoResetEvent _isStopping = 
        new AutoResetEvent (false);

    // use a Queue<>, cleaner and less error prone than
    // manipulating an array. btw, check your indexing
    // on your array queue, while starvation will not
    // occur in your full pass, ordering is not preserved
    private readonly Queue<LogObj> _queue = new Queue<LogObj>();

    ...

    public void Log (string message)
    {
        // you want to lock ONLY when absolutely necessary
        // which in this case is accessing the ONE resource
        // of _queue.
        lock (_syncRoot)
        {
            _queue.Enqueue (new LogObj (DateTime.Now, message));
        }
    }

    public void GetLog ()
    {
        // while not stopping
        // 
        // NOTE: _loggerThread is polling. to increase poll
        // interval, increase wait period. for a more event
        // driven approach, consider using another
        // AutoResetEvent at end of loop, and signal it
        // from Log() method above
        for (; !_isStopping.WaitOne(1); )
        {
            List<LogObj> logs = null;
            // again lock ONLY when you need to. because our log
            // operations may be time-intensive, we do not want
            // to block pessimistically. what we really want is 
            // to dequeue all available messages and release the
            // shared resource.
            lock (_syncRoot)
            {
                // copy messages for local scope processing!
                // 
                // NOTE: .Net3.5 extension method. if not available
                // logs = new List<LogObj> (_queue);
                logs = _queue.ToList ();
                // clear the queue for new messages
                _queue.Clear ();
                // release!
            }
            foreach (LogObj log in logs)
            {
                // do your thang
                ...
            }
        }
    }
}
...
public void Stop ()
{
    // graceful thread termination. give threads a chance!
    _isStopping.Set ();
    _loggerThread.Join (100);
    if (_loggerThread.IsAlive)
    {
        _loggerThread.Abort ();
    }
    _loggerThread = null;
}
johnny g
Hi there. Thanks for remembering me to use graceful thread termination, instead of forceful "abort" method. I'll definitely adopt that!
XpiritO
At first look I didn't notice some problems I found in your code, when I tried to adapt my solution following your suggestions. In fact, you may not "Pulse" or "Wait" over a synchronization object outside the corresponding "lock" scope. Considering this, there are no gain on creating a local copy of the logger queue on your "GetLog" method; neither it is possible to reduce locking acquisition over the queue, in your "Log" method. Also, some methods you refer don't exist. Queue doesn't have a "ToList()" method (only a "ToArray()") and "Monitor.Pulse()" must receive a parameter (synch object)...
XpiritO
Ahem, important lesson, never "wing" a response. Using a Monitor [correctly], is synonomous with using lock. Since we are using lock to safeguard our queue, Monitor is completely superfluous. Remove it [from both solution above and\or your previous solution]. It's not needed.There is a lot of gain from creating local copy of queue. NEVER perform any processing in a critical section. You want to access your shared resource and release it. Processing [ie writing to console, file, RPC, whatever] may be performed later.ToList is an extension method, available in .Net3.5, using System.Linq
johnny g
er, by "wing" a repsonse, was referring to myself obviously. should have looked up Monitor earlier, would have spotted the redundancy. :S
johnny g
A: 

I'm just doing a thought experiment here, since I don't have time to actually try code right now, but I think you can do this without locks at all if you're creative.

Have your logging class contain a method that allocates a queue and a semaphore each time it's called (and another that deallocates the queue and semaphore when the thread is done). The threads that want to do logging will call this method when they start. When they want to log, they push the message onto their own queue and set the semaphore. The logger thread has a big loop that runs through the queues and checks the associated semaphores. If the semaphore associated with the queue is greater than zero, then the queue gets popped off and the semaphore decremented.

Because you're not attempting to pop things off the queue until after the semaphore is set, and you're not setting the semaphore until after you've pushed things onto the queue, I think this will be safe. According to the MSDN documentation for the queue class, if you are enumerating the queue and another thread modifies the collection, an exception is thrown. Catch that exception and you should be good.

jfawcett