views:

345

answers:

3

What are ways to pass data between threads in .NET? There are two things I can currently think of:

  1. Membervariables, e.g. using the producer-consumer-queue pattern.
  2. Using the ParameterizedThreadStart delegate when starting the the thread. (Only works once, not good for long running background worker threads).

What facitlites does the .NET Framework have to solve this problem. Maybe .NET has a generic producer-consumer-pattern already implemented? Maybe I can use Thread.GetData and Thread.SetData somehow?

+1  A: 

Take a look here, some of these responses may answer your question.

rick schott
+1  A: 

While it's not a built in solution you can create a class containing a private "sync" object. Then, in properties and method calls use the lock statement on the sync object to ensure serialized access.

eg:

class DataClass{
    private object m_syncObject=new object();

    private string m_data;

    public string Data
    {
        get{
            lock(m_syncobject)
            {
                return m_data;
            }
        }
        set{
           lock(m_syncobject)
           {
                m_data=value;
           }

        }
    }
}

Create an instance of DataClass() on one thread, then pass this instance to a second or more threads. When needed access the thread safe Data property to pass/receive data between threads.

Ash
+1  A: 

As an alternative to Ash's solution, consider the following example.

Let's say you have two threads - one for receiving packets from a socket and another for processing those packets. Obviously, the Receiver thread needs to inform the Processor thread when a packet is available for processing, so the packets need to be shared between the threads somehow. I typically do this with a shared data queue.

At the same time, we don't necessarily want to tightly couple the threads together. For example, the Receiver thread shouldn't even know that a Processor thread exists. All the Receiver needs to focus on is receiving packets from the network and then notifying any interested subscribers that packets are available for processing. Events are the perfect way of achieving this in .NET.

So here's some code.

using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Packet
{
    public byte[] Buffer { get; private set; }
    public Packet(byte[] buffer)
    {
        Buffer = buffer;
    }
}
public class PacketEventArgs : EventArgs
{
    public Packet Packet { get; set; }
}
public class UdpState
{
    public UdpClient Client{get;set;}
    public IPEndPoint EndPoint{get;set;}
}
public class Receiver
{
    public event EventHandler<PacketEventArgs> PacketReceived;
    private Thread _thread;
    private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
    public void Start() { _thread.Start(); }
    public void Stop() { _shutdownThread.Set(); }
    public Receiver()
    {
        _thread = new Thread(
            delegate() {
                // Create the client UDP socket.
                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 5006);
                UdpClient client = new UdpClient( endPoint );
                // Receive the packets asynchronously.
                client.BeginReceive(
                    new AsyncCallback(OnPacketReceived),
                    new UdpState() { Client = client, EndPoint = endpoint });
                // Wait for the thread to end.
                _shutdownThread.WaitOne();
            }
        );
    }
    private void OnPacketReceived(IAsyncResult ar)
    {
        UdpState state = (UdpState)ar.AsyncState;
        IPEndPoint endPoint = state.EndPoint;
        byte[] bytes = state.Client.EndReceive(ar, ref endPoint);
        // Create the packet. 
        Packet packet = new Packet(bytes);
        // Notify any listeners.
        EventHandler<PacketEventArgs> handler = PacketReceived;
        if (handler != null) {
            handler(this, new PacketEventArgs() { Packet = packet });
        }
        // Read next packet.
        if (!_shutdownThread.WaitOne(0)) {
            state.Client.BeginReceive(
                new AsyncCallback(OnPacketReceived),
                state);
        }
    }
}
public class Processor
{
    private Thread _thread;
    private object _sync = new object();
    private ManualResetEvent _packetReceived = new ManualResetEvent(false);
    private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
    private Queue<Packet> _packetQueue = new Queue<Packet>(); // shared data
    public void Start() { _thread.Start(); }
    public void Stop() { _shutdownThread.Set(); }
    public Processor()
    {
        _thread = new Thread(
            delegate() {
                WaitHandle[] handles = new WaitHandle[] {
                    _shutdownThread,
                    _packetReceived
                };

                while (!_shutdownThread.WaitOne(0)) {
                    switch (WaitHandle.WaitAny(handles)) {
                        case 0: // Shutdown Thread Event
                            break;
                        case 1: // Packet Received Event
                            _packetReceived.Reset();
                            ProcessPackets();
                            break;
                        default:
                            Stop();
                            break;
                    }
                }
            }
        );
    }
    private void ProcessPackets()
    {
        Queue<Packet> localPacketQueue = null;
        Queue<Packet> newPacketQueue = new Queue<Packet>();
        lock (_sync) {
            // Swap out the populated queue with the empty queue.
            localPacketQueue = _packetQueue;
            _packetQueue = newPacketQueue;
        }

        foreach (Packet packet in localPacketQueue) {
            Console.WriteLine(
                "Packet received with {0} bytes",
                packet.Buffer.Length );
        }
    }
    public void OnPacketReceived(object sender, PacketEventArgs e)
    {
        // NOTE:  This function executes on the Receiver thread.
        lock (_sync) {
            // Enqueue the packet.
            _packetQueue.Enqueue(e.Packet);
        }

        // Notify the Processor thread that a packet is available.
        _packetReceived.Set();
    }
}
static void Main()
{
    Receiver receiver = new Receiver();
    Processor processor = new Processor();

    receiver.PacketReceived += processor.OnPacketReceived;

    processor.Start();
    receiver.Start();

    Thread.Sleep(5000);

    receiver.Stop();
    processor.Stop();
}

I know there's a lot to digest there. The program should work in .NET 3.5 providing you have UDP traffic on port 5006.

As far as data sharing between threads, the points of interest are the ProcessPackets() and OnPacketReceived() methods of the Processor class. Notice that the OnPacketReceived() method occurs on the Receiver thread, even though the method is part of the Processor class, and that the queue is synchronized using a sync object.

Matt Davis