views:

108

answers:

2

I wrote a multithreaded application for .NET and in a very important portion of code I have the following:

public class ContainerClass {
    private object list_lock;
    private ArrayList list;
    private object init_lock = new object();
    private ThreadClass thread;

    public void Start() {
        lock(init_lock) {
            if (thread == null) {
                thread = new ThreadClass();
                ...
            }
        }
    }

    public void Stop() {
        lock(init_lock) {
            if (thread != null) {
                thread.processList(0);
                thread.finish();
                thread.waitUntilFinished();
                thread = null;
            } else {
                throw new ApplicationException("Assertion failed - already stopped.");
            }

            ...
        }
    }

    private class ThreadedClass {
        private ContainerClass container;
        private Thread thread;
        private bool finished;
        private bool actually_finished;

        public ThreadedClass(ContainerClass container) {
            this.container = container;
            thread = new Thread(run);
            thread.IsBackground = true;
            thread.Start();
        }

        private void run() {
            bool local_finished = false;
            while (!local_finished) {
                ArrayList to_process = null;
                lock (container.list_lock) {
                    if (container.list.Count > 0) {
                        to_process = new ArrayList();
                        to_process.AddRange(container.list);
                    }
                }
                if (to_process == null) {
                    // Nothing to process so wait
                    lock (this) {
                        if (!finished) {
                            try {
                                Monitor.Wait(this);
                            } catch (ThreadInterruptedException) {
                            }
                        }
                    }
                } else if (to_process.Count > 0) {
                    // Something to process, so go ahead and process the journals,
                    int sz = to_process.Count;
                    // For all elements
                    for (int i = 0; i < sz; ++i) {
                        // Pick the lowest element to process
                        object obj = to_process[i];
                        try {
                            // process the element...
                            ...
                        } catch (IOException e) {
                            ...
                            // If there is an error processing the best thing to do is finish
                            lock (this) {
                                finished = true;
                            }
                        }
                    }
                }

                lock (this) {
                    local_finished = finished;
                    // Remove the elements that we have just processed.
                    if (to_process != null) {
                        lock (container.list_lock) {
                            int sz = to_process.Count;
                            for (int i = 0; i < sz; ++i) {
                                container.list.RemoveAt(0);
                            }
                        }
                    }
                    // Notify any threads waiting
                    Monitor.PulseAll(this);
                }
            }

            lock (this) {
                actually_finished = true;
                Monitor.PulseAll(this);
            }
        }

        public void waitUntilFinished() {
            lock (this) {
                try {
                    while (!actually_finished) {
                        Monitor.Wait(this);
                    }
                } catch (ThreadInterruptedException e) {
                    throw new ApplicationException("Interrupted: " + e.Message);
                }
            }
        }

        public void processList(int until_size) {
            lock (this) {
                Monitor.PulseAll(this);
                int sz;
                lock (container.list_lock) {
                    sz = container.list.Count;
                }
                // Wait until the sz is smaller than 'until_size'
                while (sz > until_size) {
                    try {
                        Monitor.Wait(this);
                    } catch (ThreadInterruptedException ) {
                    }
                    lock (container.list_lock) {
                        sz = container.list.Count;
                    }
                }
            }
        }
    }
}

As you can see, the thread waits until the collection is empty but it seems that the synchronization clashes forbids the thread to enter at the point (the only one in the whole code) where an element is removed from the collection list in the ContainerClass. This clash provokes the code to never return and the application to continue running if the method processList is called with the value of until_size of 0.

I beg any better developer than me (and I guess there are a lot out there) to help me fixing this small piece of code, since I really can't understand why the list isn't decremented...

Thank you very much from the bottom of my heart.

PS. I would like to underline that the code works perfectly for all the time: the only situation in which it brakes it's when calling thread.processList(0) from ContainerClass.Stop().

+1  A: 

Could the problem be that you are locking the ThreadClass object itself rather than a synchronizing object?

Try adding another private variable to lock on:

private static readonly object lockObject = new object()

and replace all the calls of lock(this) with lock(lockObject)

MSDN clearly advises against what your doing:

In general, avoid locking on a public type, or instances beyond your code's control. The common constructs lock (this), lock (typeof (MyType)), and lock ("myLock") violate this guideline:

  lock (this) is a problem if the instance can be accessed publicly.

Edit:

I think I see a deadlock condition. If you call run() when there are no objects to process, or you get to no objects to process, you lock(this), then call Monitor.Wait(this) and the thread waits:

        if (to_process == null) {
            // Nothing to process so wait
            lock (this) { /* nothing's going to get this lock again until Monitor.PulseAll(this) is called from somewhere */
                if (!finished) {
                    try {
                        Monitor.Wait(this); /* thread is waiting for Pulse(this) or PulseAll(this) */
                    } catch (ThreadInterruptedException) {
                    }
                }
            }
        }

If you are in this condition when you call Container.Stop(), when ThreadProcess.processList(int) is called, you call lock(this) again, which can't enter the section because the run() method still has the lock:

 lock (this) { /* run still holds this lock, waiting for PulseAll(this) to be called */
                Monitor.PulseAll(this); /* this isn't called so run() never continues */
                int sz;
                lock (container.list_lock) {
                    sz = container.list.Count;
                }

So, Monitor.PulseAll() can't be called to free the waiting thread in the run() method to exit the lock(this) area, so they are deadlocked waiting on each other. Right?

scottm
sorry to say that this solution doesn't change the situation... thank you anyway for trying... :)
Antonello
@Antonello, no problem. Remove the community wiki (click edit, uncheck community wiki).
scottm
A: 

I think you should try to explain better what you actually want to achieve.

 public void processList(int until_size) {
        lock (this) {
            Monitor.PulseAll(this);

This looks very strange as you should call the Monitor.Pulse when changing the lock state and not when beginning with locking. Where are you creating the worker threads - this section is not clear as I see only Thread.Start()? Btw I would advise you to look at PowerCollections - maybe you find what you need there.

weismat
I updated the code example to include the points into the ContainerClass where the ThreadClass is instantiated and called.By the way, the method "processList" notifies all the threads waiting right after acquiring the lock to access other portions of the code locked on "this".
Antonello
Your statement does not work.The description says for PulseAll says "Notifies a thread in the waiting queue of a change in the locked object's state." - but it only changes if the lock is released.
weismat
I might have done something not coherent (although I followed the example in the MSDN library for Monitor.Pulse: http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse.aspx), but it works and when I tried to remove it, everything crashed...
Antonello
Is not processList(0) the same as waitUntilFinished?I think you are killing yourself with your boolean variables for the state - instead I would suggest to use functions with a proper locking with a locker object.
weismat