views:

1062

answers:

1

I'm having a problem with interlocked Monitor.Wait and Monitor.Pulse in a multi-threaded TCP server. To demonstrate my issues, here is my server code:

public class Server
{
    TcpListener listener;
    Object sync;
    IHandler handler;
    bool running;

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        sync = new Object();
        running = false;
    }

    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }

    public void Stop()
    {
        lock (sync)
        {
            listener.Stop();
            running = false;
            Monitor.Pulse(sync);
        }
    }

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }

    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Monitor.Pulse(sync);
        } 

        if (running)
        {
            TcpListener listener = (TcpListener)result.AsyncState;
            using (TcpClient client = listener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
}

And here is my client code:

class Client
{
    class EchoHandler : IHandler
    {
        public void Handle(Stream stream)
        {
            System.Console.Out.Write("Echo Handler: ");
            StringBuilder sb = new StringBuilder();
            byte[] buffer = new byte[1024];
            int count = 0;
            while ((count = stream.Read(buffer, 0, 1024)) > 0)
            {
                sb.Append(Encoding.ASCII.GetString(buffer, 0, count));
            }
            System.Console.Out.WriteLine(sb.ToString());
            System.Console.Out.Flush();
        }
    }

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];

    public static int Main()
    {
        Server server1 = new Server(new EchoHandler(), 1000);
        Server server2 = new Server(new EchoHandler(), 1001);

        server1.Start();
        server2.Start();

        Console.WriteLine("Press return to test...");
        Console.ReadLine();

        // Note interleaved ports
        SendMsg("Test1", 1000);
        SendMsg("Test2", 1001);
        SendMsg("Test3", 1000);
        SendMsg("Test4", 1001);
        SendMsg("Test5", 1000);
        SendMsg("Test6", 1001);
        SendMsg("Test7", 1000);

        Console.WriteLine("Press return to terminate...");
        Console.ReadLine();

        server1.Stop();
        server2.Stop();

        return 0;
    }

    public static void SendMsg(String msg, int port)
    {
        IPEndPoint endPoint = new IPEndPoint(localhost, port);

        byte[] buffer = Encoding.ASCII.GetBytes(msg);
        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
        {
            s.Connect(endPoint);
            s.Send(buffer);
        }
    }
}

The client sends seven messages, but the server only prints four:

Press return to test...

Press return to terminate...
Echo Handler: Test1
Echo Handler: Test3
Echo Handler: Test2
Echo Handler: Test4

I suspect the monitor is getting confused by allowing the Pulse to occur (in the server's Accept method) before the Wait occurs (in the ThreadStart method), even though the ThreadStart should still have the lock on the sync object until it calls Monitor.Wait(), and then the Accept method can acquire the lock and send its Pulse. If you comment out these two lines in the server's Stop() method:

//listener.Stop();
//running = false;

The remaining messages appear when the server's Stop() method is called (i.e. waking up the server's sync object causes it to dispatch the remaining incoming messages). It seems to me this can only occur in a race condition between the ThreadStart and Accept methods, but the lock around the sync object should prevent this.

Any ideas?

Many thanks, Simon.

ps. Note that I'm aware that the output appears out-of-order etc., I'm specifically asking about a race condition between locks and the Monitor. Cheers, SH.

+5  A: 

The problem is that you are using Pulse/Wait as a signal. A proper signal, such as a AutoResetEvent has a state such that it stays signalled until a thread has called WaitOne(). Calling Pulse without any threads waiting on it will become a noop.

This is combined with the fact that a lock can be taken many times by the same thread. Since you are using Async programming the Accept callback can be called by the same thread that did the BeginAcceptTcpClient.

Let me illustrate. I commented out the second server, and changed some code on your server.

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        lock (sync)
        {
            while (running)
            {
                try
                {
                    Console.WriteLine("BeginAccept [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                    Console.WriteLine("Wait [{0}]", 
                        Thread.CurrentThread.ManagedThreadId);
                    Monitor.Wait(sync);  // Release lock and wait for a pulse
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    lock (sync)
    {
        Console.WriteLine("Pulse [{0}]", 
            Thread.CurrentThread.ManagedThreadId);
        Monitor.Pulse(sync);
    }
    if (running)
    {
        TcpListener localListener = (TcpListener)result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}

The output from my run shown below. If you run this code yourself the values will differ, but it will be the same in general.

Press return to test...
BeginAccept [3]
Wait [3]

Press return to terminate...
Pulse [5]
BeginAccept [3]
Pulse [3]
Echo Handler: Test1
Echo Handler: Test3
Wait [3]

As you can see there are two Pulse's called, one from a separate thread (the Pulse [5]) which wakes up the first Wait. Thread 3 then does another BeginAccept, but having Pending incoming connections that thread decides to call the Accept callback immediately. Since the Accept is called by the same thread, the Lock(sync) doesn't block but Pulse [3] immediately on an empty thread queue.

Two handlers are invoked and handles the two messages.

Everything is fine, and the ThreadStart start to run again and goes to Wait indefinitely.

Now, the underlying issue here is that you are trying to use a monitor as a signal. Since it doesn't remember the state the second Pulse get's lost.

But there is an easy solution for this. Use AutoResetEvents, which is a proper signal and it will remember its state.

public Server(IHandler handler, int port)
{
    this.handler = handler;
    IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
    listener = new TcpListener(address, port);
    running = false;
    _event = new AutoResetEvent(false);
}

public void Start()
{
    Thread thread = new Thread(ThreadStart);
    thread.Start();
}

public void Stop()
{
    listener.Stop();
    running = false;
    _event.Set();
}

void ThreadStart()
{
    if (!running)
    {
        listener.Start();
        running = true;
        while (running)
        {
            try
            {
                listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                _event.WaitOne();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}

void Accept(IAsyncResult result)
{
    // Let the server continue listening
    _event.Set();
    if (running)
    {
        TcpListener localListener = (TcpListener) result.AsyncState;
        using (TcpClient client = localListener.EndAcceptTcpClient(result))
        {
            handler.Handle(client.GetStream());
        }
    }
}
Mats Fredriksson
Thanks Mats. I assumed BeginAcceptTcpClient always ran on a separate thread and thus I could use the sync object as a critical section. You were spot on and signals are the way to go. Thanks again. SH