views:

952

answers:

4

Ok, I'm going crazy here. I have been rewriting the NIO code for my server, and running into some real headaches. The bottom line is that getting NIO "right" is very hard. Some people pointed me to the Rox tutorial at http://rox-xmlrpc.sourceforge.net/niotut/, which seems to be on a good path, but is not as complete as I would like. For example, I need to know how to close a connection on the server side only after queued outgoing ByteBuffers have been sent. SocketChannel.close() is abrupt and can lose data if done prematurely. I also need to send large packets that are larger than the read ByteBuffer. The Rox code (nor any other code I've looked at) deals with this. There are also many places where it seems uncaught exceptions are not properly handled. In my tests there are some errors and it's not clear how to handle them properly given the complexity of NIO.

Anyhow, as I try to solve these problems, more of the tricky subtleties show up, and it's getting quite complex. So I'm considering an entirely different approach. Many people are saying that NIO is highly error-prone and needlessly confusing and complex. They advocate using a "thread-per-connection" model that uses blocking IO where each socket connection is run on its own thread. This seems like a good idea, and would reduce the bottleneck on the front end by having one selector thread for all connections (as in NIO), at the expense of higher overhead (for the threads). This sentiment is echoed by posts such as http://paultyma.blogspot.com/2008/03/writing-java-multithreaded-servers.html and http://mailinator.blogspot.com/2008/02/kill-myth-please-nio-is-not-faster-than.html

The code should be simple compared to NIO, but I really want some example code to look at. I can't seem to find anything. The problem is that I don't think this "thread-per-connection blocking I/O" strategy has a better name that I can actually get good Google results for. Can anyone link me to some tutorials or simple examples to explain using this "older" method of I/O and scaling it up using a thread pool? Or have any other words of wisdom? Thanks very much!

A: 

Try searching for terms like "threaded sockets java".

Dave Jarvis
A: 

I suggest you look in the sample/nio directory in your JDK. This has a number of simple examples including the two you mention.

Peter Lawrey
A: 

You may also want to consider using a higher level framework such as Grizzly instead of using NIO directly. The framework should allow you to concentrate on your use case rather than the subtleties of NIO.

Robert Christie
Jboss also has Netty: http://www.jboss.org/netty
Dave
+1  A: 

If you are working with NIO, I would also suggest to use a Framework. I have been working with Apache Mina and I would recommend it.

As to the blocking IO, essentially you will need a Listener Thread that accepts incoming connections and spawns additional threads that will handle each connection. Here is an example of such a Listener code, as originally contributed to the Apache Felix Project. If you look for the complete but modified version, you can browse the source here.

e.g.

    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
        * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.felix.shell.remote;


    import java.io.IOException;
    import java.io.PrintStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketException;


    /**
     * Implements a simple listener that will accept a single connection.
     * <p/>
     *
     * @author Dieter Wimberger (wimpi)
     */
    class Listener
    {

        private int m_Port;
        private Thread m_ListenerThread;
        private boolean m_Stop = false;
        private ServerSocket m_ServerSocket;
        private AtomicInteger m_UseCounter;
        private int m_MaxConnections;


        /**
         * Activates this listener on a listener thread (telnetconsole.Listener).
         */
        public void activate()
        {
            //configure from system property
            try
            {
                m_Port = Integer.parseInt( System.getProperty( "osgi.shell.telnet.port", "6666" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            try
            {
                m_MaxConnections = Integer.parseInt( System.getProperty( "osgi.shell.telnet.maxconn", "2" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            m_UseCounter = new AtomicInteger( 0 );
            m_ListenerThread = new Thread( new Acceptor(), "telnetconsole.Listener" );
            m_ListenerThread.start();
        }//activate


        /**
         * Deactivates this listener.
         * <p/>
         * The listener's socket will be closed, which should cause an interrupt in the
         * listener thread and allow for it to return. The calling thread joins the listener
         * thread until it returns (to ensure a clean stop).
         */
        public void deactivate()
        {
            try
            {
                m_Stop = true;
                //wait for the listener thread
                m_ServerSocket.close();
                m_ListenerThread.join();
            }
            catch ( Exception ex )
            {
                Activator.getServices().error( "Listener::deactivate()", ex );
            }
        }//deactivate

        /**
         * Class that implements the listener's accept logic as a <tt>Runnable</tt>.
         */
        private class Acceptor implements Runnable
        {

            /**
             * Listens constantly to a server socket and handles incoming connections.
             * One connection will be accepted and routed into the shell, all others will
             * be notified and closed.
             * <p/>
             * The mechanism that should allow the thread to unblock from the ServerSocket.accept() call
             * is currently closing the ServerSocket from another thread. When the stop flag is set,
             * this should cause the thread to return and stop.
             */
            public void run()
            {
                try
                {
                    /*
                        A server socket is opened with a connectivity queue of a size specified
                        in int floodProtection.  Concurrent login handling under normal circumstances
                        should be handled properly, but denial of service attacks via massive parallel
                        program logins should be prevented with this.
                    */
                    m_ServerSocket = new ServerSocket( m_Port, 1 );
                    do
                    {
                        try
                        {
                            Socket s = m_ServerSocket.accept();
                            if ( m_UseCounter.get() >= m_MaxConnections )
                            {
                                //reject with message
                                PrintStream out = new PrintStream( s.getOutputStream() );
                                out.print( INUSE_MESSAGE );
                                out.flush();
                                //close
                                out.close();
                                s.close();
                            }
                            else
                            {
                                m_UseCounter.increment();
                                //run on the connection thread
                                Thread connectionThread = new Thread( new Shell( s, m_UseCounter ) );
                                connectionThread.start();
                            }
                        }
                        catch ( SocketException ex )
                        {
                        }
                    }
                    while ( !m_Stop );

                }
                catch ( IOException e )
                {
                    Activator.getServices().error( "Listener.Acceptor::activate()", e );
                }
            }//run

        }//inner class Acceptor

        private static final String INUSE_MESSAGE = "Connection refused.\r\n"
            + "All possible connections are currently being used.\r\n";

    }//class Listener

You can find other examples here and here.

Note that the advantage of NIO over the blocking model comes into play when you have more load. From a certain point on, the amount of extra work for Thread creation, management and context switching will limit your system performance.

Dieter