views:

819

answers:

5

HI everyone,

I'm going to be writing some code that has to listen for TCPIP messages coming from GSM mobile phones over GPRS. In the fullness of time, I see this as running on a Virtual Private Server, and it could well be processing multiple messages every second.

I'm a bit of a network programming virgin, so I've done a bit of research on the internet, and read a few tutorials. The approach I am considering at the moment is a windows service using sockets to monitor the port. If my understanding is correct, I need one socket to listen for connections from clients, and each time someone tries to connect with the port I will be passed another socket with which to communicate with them? Does this sound about right to more experienced ears?

I'm planning on using asynchronous communication, but on of the bigger design questions is whether to use threading or not. Threading isn't something I've really played with, and I am aware of a number of pitfalls - race conditions and debugging problems being but two.

If I avoid threads, I know I have to supply an object that acts as an identifier for a particular conversation. I was thinking GUIDs for this - any opinions?

Thanks in advance for any responses...

Martin

+3  A: 

It is surprisingly simple to make a multi-threaded server. Check out this example.

class Server
{
    private Socket socket;
    private List<Socket> connections;
    private volatile Boolean endAccept;

    // glossing over some code.


    /// <summary></summary>
    public void Accept()
    {
        EventHandler<SocketAsyncEventArgs> completed = null;
        SocketAsyncEventArgs args = null;

        completed = new EventHandler<SocketAsyncEventArgs>((s, e) =>
        {
            if (e.SocketError != SocketError.Success)
            {
                // handle
            }
            else
            {
                connections.Add(e.AcceptSocket);
                ThreadPool.QueueUserWorkItem(AcceptNewClient, e.AcceptSocket);
            }

            e.AcceptSocket = null;
            if (endAccept)
            {
                args.Dispose();
            }
            else if (!socket.AcceptAsync(args))
            {
                completed(socket, args);
            }
        });

        args = new SocketAsyncEventArgs();
        args.Completed += completed;

        if (!socket.AcceptAsync(args))
        {
            completed(socket, args);
        }
    }

    public void AcceptNewClient(Object state)
    {
        var socket = (Socket)state;
        // proccess        
    }        
}
ChaosPandion
Thanks for the response ChoasPandion...Unfortunately I am on the way out of the house now, but I will take a look at this later.Lamda functions eh? Still getting my head around the syntax...
Martin Milan
Just let me know if anything doesn't make sense and I'll expand upon it.
ChaosPandion
We can use BeginAcceptSocket instead.
Sergey Teplyakov
There is a distinct advantage to using AcceptAsync. When using BeginAccept you need to create a new IAsyncResult object each time.For high performance servers you want to keep object creation to a minimum. Using this method you can create one object and use it for the duration of the server.
ChaosPandion
Choas/Sergey,Thanks for your comments. How significant is this object creation overhead likely to be though? I'm thinking it might well be simpler to avoid dealing with threads if it is as easy as Sergey suggests...
Martin Milan
Ignore my comment, the odds of the overhead being relevant to you are small. Really there is no reason for you to not use threads. The great thing is each time you use threads they get easier and easier.
ChaosPandion
Cheers Chaos,I'm going to take a look at Threading (it would be useful for other areas of work), but I think for now I'm going to initially investigate the IO Completion Port way of doing things.Cheers for your input though, I'm sincerely grateful.
Martin Milan
@Martin - Good luck, make sure you stop by again if you run in to trouble.
ChaosPandion
Thanks Chaos...
Martin Milan
A: 

Well, the C# syntax is not fresh in my mind now but I don't think it is to much different from the Posix standard.

What you can may do is when you create your listen socket you can stipulate a value for the backlog (maximum number of simultaneous connections for that server) and create a thread pull with the same size. Thread pools are easier to use than traditional ones. The TCP you queue for you all the connections above the backlog parameter.

Andres
Thanks for the input Andres...
Martin Milan
+1  A: 

A bit of advise from the guy who deals mainly with mobile networking: do your homework with regular networking connection, preferably on the localhost. This will save you a lot of time during testing and will keep you sane until you figure out the approach that works for you best.

As for some particular implementation, I always go with synchronized sockets (you will need to configure timeouts to not to get stuck if something will go wrong) and everything runs in separate threads that are synchronized with the help of events. It's much simplier than you think. Here's some useful links to get you started:

Cheers - I'll add to the list of things to look at... Whoever thought this would be so much fun, eh?
Martin Milan
+3  A: 

Starting from .net framework 2.0 SP1 there are some changings in socket libraries related to asyncronous sockets.

All multithreading used under the hood. We have no need to use multithreading manually (we don't need to use even ThreadPool explicitly). All what we do - using BeginAcceptSocket for starting accepting new connections, and using SocketAsyncEventArgs after accepting new connection .

Short implementation:

//In constructor or in method Start
var tcpServer = new TcpListener(IPAddress.Any, port);
tcpServer.Start();
tcpServer.BeginAcceptSocket(EndAcceptSocket, tcpServer);

//In EndAcceptSocket
Socket sock= lister.EndAcceptSocket(asyncResult);
var e = new SocketAsyncEventArgs();
e.Completed += ReceiveCompleted; //some data receive handle
e.SetBuffer(new byte[SocketBufferSize], 0, SocketBufferSize);
if (!sock.ReceiveAsync(e))
{//IO operation finished syncronously
  //handle received data
  ReceiveCompleted(sock, e);
}//IO operation finished syncronously
//Add sock to internal storage

Full implementation:

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;

namespace Ample
{
    public class IPEndPointEventArgs : EventArgs
    {
        public IPEndPointEventArgs(IPEndPoint ipEndPoint)
        {
            IPEndPoint = ipEndPoint;
        }

        public IPEndPoint IPEndPoint { get; private set; }
    }

    public class DataReceivedEventArgs : EventArgs
    {
        public DataReceivedEventArgs(byte[] data, IPEndPoint ipEndPoint)
        {
            Data = data;
            IPEndPoint = ipEndPoint;
        }

        public byte[] Data { get; private set; }
        public IPEndPoint IPEndPoint { get; private set; }

    }
    /// <summary>
    /// TcpListner wrapper
    /// Encapsulates asyncronous communications using TCP/IP.
    /// </summary>
    public sealed class TcpServer : IDisposable
    {
        //----------------------------------------------------------------------
        //Construction, Destruction
        //----------------------------------------------------------------------
        /// <summary>
        /// Creating server socket
        /// </summary>
        /// <param name="port">Server port number</param>
        public TcpServer(int port)
        {
            connectedSockets = new Dictionary<IPEndPoint, Socket>();
            tcpServer = new TcpListener(IPAddress.Any, port);
            tcpServer.Start();
            tcpServer.BeginAcceptSocket(EndAcceptSocket, tcpServer);
        }
        ~TcpServer()
        {
            DisposeImpl(false);
        }
        public void Dispose()
        {
            DisposeImpl(true);
        }

        //----------------------------------------------------------------------
        //Public Methods
        //----------------------------------------------------------------------

        public void SendData(byte[] data, IPEndPoint endPoint)
        {
            Socket sock;
            lock (syncHandle)
            {
                if (!connectedSockets.ContainsKey(endPoint))
                    return;
                sock = connectedSockets[endPoint];
            }
            sock.Send(data);
        }

        //----------------------------------------------------------------------
        //Events
        //----------------------------------------------------------------------
        public event EventHandler<IPEndPointEventArgs> SocketConnected;
        public event EventHandler<IPEndPointEventArgs> SocketDisconnected;
        public event EventHandler<DataReceivedEventArgs> DataReceived;

        //----------------------------------------------------------------------
        //Private Functions
        //----------------------------------------------------------------------
        #region Private Functions
        //Обработка нового соединения
        private void Connected(Socket socket)
        {
            var endPoint = (IPEndPoint)socket.RemoteEndPoint;

            lock (connectedSocketsSyncHandle)
            {
                if (connectedSockets.ContainsKey(endPoint))
                {
                    theLog.Log.DebugFormat("TcpServer.Connected: Socket already connected! Removing from local storage! EndPoint: {0}", endPoint);
                    connectedSockets[endPoint].Close();
                }

                SetDesiredKeepAlive(socket);
                connectedSockets[endPoint] = socket;
            }

            OnSocketConnected(endPoint);
        }

        private static void SetDesiredKeepAlive(Socket socket)
        {
            socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
            const uint time = 10000;
            const uint interval = 20000;
            SetKeepAlive(socket, true, time, interval);
        }
        static void SetKeepAlive(Socket s, bool on, uint time, uint interval)
        {
            /* the native structure
            struct tcp_keepalive {
            ULONG onoff;
            ULONG keepalivetime;
            ULONG keepaliveinterval;
            };
            */

            // marshal the equivalent of the native structure into a byte array
            uint dummy = 0;
            var inOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
            BitConverter.GetBytes((uint)(on ? 1 : 0)).CopyTo(inOptionValues, 0);
            BitConverter.GetBytes((uint)time).CopyTo(inOptionValues, Marshal.SizeOf(dummy));
            BitConverter.GetBytes((uint)interval).CopyTo(inOptionValues, Marshal.SizeOf(dummy) * 2);
            // of course there are other ways to marshal up this byte array, this is just one way

            // call WSAIoctl via IOControl
            int ignore = s.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);

        }
        //socket disconnected handler
        private void Disconnect(Socket socket)
        {
            var endPoint = (IPEndPoint)socket.RemoteEndPoint;

            lock (connectedSocketsSyncHandle)
            {
                connectedSockets.Remove(endPoint);
            }

            socket.Close();

            OnSocketDisconnected(endPoint);
        }

        private void ReceiveData(byte[] data, IPEndPoint endPoint)
        {
            OnDataReceived(data, endPoint);
        }

        private void EndAcceptSocket(IAsyncResult asyncResult)
        {
            var lister = (TcpListener)asyncResult.AsyncState;
            theLog.Log.Debug("TcpServer.EndAcceptSocket");
            if (disposed)
            {
                theLog.Log.Debug("TcpServer.EndAcceptSocket: tcp server already disposed!");
                return;
            }

            try
            {
                Socket sock;
                try
                {
                    sock = lister.EndAcceptSocket(asyncResult);
                    theLog.Log.DebugFormat("TcpServer.EndAcceptSocket: remote end point: {0}", sock.RemoteEndPoint);
                    Connected(sock);
                }
                finally
                {
                    //EndAcceptSocket can failes, but in any case we want to accept 
new connections
                    lister.BeginAcceptSocket(EndAcceptSocket, lister);
                }

                //we can use this only from .net framework 2.0 SP1 and higher
                var e = new SocketAsyncEventArgs();
                e.Completed += ReceiveCompleted;
                e.SetBuffer(new byte[SocketBufferSize], 0, SocketBufferSize);
                BeginReceiveAsync(sock, e);

            }
            catch (SocketException ex)
            {
                theLog.Log.Error("TcpServer.EndAcceptSocket: failes!", ex);
            }
            catch (Exception ex)
            {
                theLog.Log.Error("TcpServer.EndAcceptSocket: failes!", ex);
            }
        }

        private void BeginReceiveAsync(Socket sock, SocketAsyncEventArgs e)
        {
            if (!sock.ReceiveAsync(e))
            {//IO operation finished syncronously
                //handle received data
                ReceiveCompleted(sock, e);
            }//IO operation finished syncronously
        }

        void ReceiveCompleted(object sender, SocketAsyncEventArgs e)
        {
            var sock = (Socket)sender;
            if (!sock.Connected)
                Disconnect(sock);
            try
            {

                int size = e.BytesTransferred;
                if (size == 0)
                {
                    //this implementation based on IO Completion ports, and in this case
                    //receiving zero bytes mean socket disconnection
                    Disconnect(sock);
                }
                else
                {
                    var buf = new byte[size];
                    Array.Copy(e.Buffer, buf, size);
                    ReceiveData(buf, (IPEndPoint)sock.RemoteEndPoint);
                    BeginReceiveAsync(sock, e);
                }
            }
            catch (SocketException ex)
            {
                //We can't truly handle this excpetion here, but unhandled
                //exception caused process termination.
                //You can add new event to notify observer
                theLog.Log.Error("TcpServer: receive data error!", ex);
            }
            catch (Exception ex)
            {
                theLog.Log.Error("TcpServer: receive data error!", ex);
            }
        }

        private void DisposeImpl(bool manualDispose)
        {
            if (manualDispose)
            {
                //We should manually close all connected sockets
                Exception error = null;
                try
                {
                    if (tcpServer != null)
                    {
                        disposed = true;
                        tcpServer.Stop();
                    }
                }
                catch (Exception ex)
                {
                    theLog.Log.Error("TcpServer: tcpServer.Stop() failes!", ex);
                    error = ex;
                }

                try
                {
                    foreach (var sock in connectedSockets.Values)
                    {
                        sock.Close();
                    }
                }
                catch (SocketException ex)
                {
                    //During one socket disconnected we can faced exception
                    theLog.Log.Error("TcpServer: close accepted socket failes!", ex);
                    error = ex;
                }
                if ( error != null )
                    throw error;
            }
        }


        private void OnSocketConnected(IPEndPoint ipEndPoint)
        {
            var handler = SocketConnected;
            if (handler != null)
                handler(this, new IPEndPointEventArgs(ipEndPoint));
        }

        private void OnSocketDisconnected(IPEndPoint ipEndPoint)
        {
            var handler = SocketDisconnected;
            if (handler != null)
                handler(this, new IPEndPointEventArgs(ipEndPoint));
        }
        private void OnDataReceived(byte[] data, IPEndPoint ipEndPoint)
        {
            var handler = DataReceived;
            if ( handler != null )
                handler(this, new DataReceivedEventArgs(data, ipEndPoint));
        }

        #endregion Private Functions

        //----------------------------------------------------------------------
        //Private Fields
        //----------------------------------------------------------------------
        #region Private Fields
        private const int SocketBufferSize = 1024;
        private readonly TcpListener tcpServer;
        private bool disposed;
        private readonly Dictionary<IPEndPoint, Socket> connectedSockets;
        private readonly object connectedSocketsSyncHandle = new object();
        #endregion Private Fields
    }
}
Sergey Teplyakov
Thanks for this Sergey - I'm having a read of your source now to get to grips with it. I'm one of those chaps who likes to understand things rather than just copy paste...
Martin Milan
I think this is the way I'm going to go - using the IO Completion port stuff.Thanks for providing full code, but I won't simply cut and paste. As I said before, I like to understand why something works, so I'll implement my own server class using yours for reference.Cheers!
Martin Milan
For more information about Winsocks API (and using IO completion ports) I recommended this book: "Network Programming for Microsoft Windows", Second Edition (one of the best books about network programming in Windows), and Windows via C/C++ by Jeffrey Richter (one of the best books about Win32, mulithreading and IO completion ports). I'm glad I could help!
Sergey Teplyakov
+1  A: 

I'm writing the same application right now and I use solution like this:

http://clutch-inc.com/blog/?p=4

It's been tested right now and works perfectly. It is important to make this service only for receiving and storing messages (somewhere) without other work. I'm using NServiceBus for saving messages. Other service takes messages from queue and do the rest.

dario-g
Thanks Dario - I'll look into that one as well.
Martin Milan