views:

108

answers:

1

I am working on a java program that is essentially a chat room. This is an assignment for class so no code please, I am just having some issues determining the most feasible way to handle what I need to do. I have a server program already setup for a single client using threads to get the data input stream and a thread to handle sending on the data output stream. What I need to do now is create a new thread for each incoming request.

My thought is to create a linked list to contain either the client sockets, or possibly the thread. Where I am stumbling is figuring out how to handle sending the messages out to all the clients. If I have a thread for each incoming message how can I then turn around and send that out to each client socket.

I'm thinking that if I had a linkedlist of the clientsockets I could then traverse the list and send it out to each one, but then I would have to create a dataoutputstream each time. Could I create a linkedlist of dataoutputstreams? Sorry if it sounds like I'm rambling but I don't want to just start coding this, it could get messy without a good plan. Thanks!

EDIT I decided to post the code I have so far. I haven't had a chance to test it yet so any comments would be great. Thanks!

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class prog4_server {

    // A Queue of Strings used to hold out bound Messages
    // It blocks till on is available
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>();

    // A linked list of data output streams
    // to all the clients
    static LinkedList<DataOutputStream> outputstreams;

    // public variables to track the number of clients
    // and the state of the server
    static Boolean serverstate = true;
    static int clients = 0;

    public static void main(String[] args) throws IOException{

        //create a server socket and a clientSocket
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(6789);
        } catch (IOException e) {
            System.out.println("Could not listen on port: 6789");
            System.exit(-1);
        }// try{...}catch(IOException e){...}

        Socket clientSocket;

        // start the output thread which waits for elements
        // in the message queue
        OutputThread out = new OutputThread();
        out.start();

        while(serverstate){

            try {

                // wait and accept a new client
                // pass the socket to a new Input Thread
                clientSocket = serverSocket.accept();
                DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream());
                InputThread in = new InputThread(clientSocket, clients);
                in.start();
                outputstreams.add(ServerOut);

            } catch (IOException e) {

                System.out.println("Accept failed: 6789");
                System.exit(-1);
            }// try{...}catch{..}

            // increment the number of clients and report
            clients = clients++;

            System.out.println("Client #" + clients + "Accepted");

        }//while(serverstate){...

    }//public static void main

    public static class OutputThread extends Thread {

        //OutputThread Class Constructor
        OutputThread() {
        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            while(!this.interrupted()) {

                try {

                    msg = outboundMessages.take();

                    for(int i=0;i<outputstreams.size();i++){

                        outputstreams.get(i).writeBytes(msg + '\n');

                    }// for(...){...

                 } catch (IOException e) {

                    System.out.println(e);

                 } catch (InterruptedException e){

                     System.out.println(e);

                 }//try{...}catch{...}

            }//while(...){

        }//public void run(){...

    }// public OutputThread(){...

    public static class InputThread extends Thread {

        Boolean threadstate = true;
        BufferedReader ServerIn;
        String user;
        int threadID;
        //SocketThread Class Constructor
        InputThread(Socket clientSocket, int ID) {

            threadID = ID;

            try{
                ServerIn = new BufferedReader(
                    new InputStreamReader(clientSocket.getInputStream()));
                    user = ServerIn.readLine();
            }
            catch(IOException e){
                System.out.println(e);
            }

        }// InputThread(...){...

        public void run() {

            String msg = null;

        while (threadstate) {

                try {

                    msg = ServerIn.readLine();

                    if(msg.equals("EXITEXIT")){

                        // if the client is exiting close the thread
                        // close the output stream with the same ID
                        // and decrement the number of clients
            threadstate = false;
                        outputstreams.get(threadID).close();
                        outputstreams.remove(threadID);
                        clients = clients--;
                        if(clients == 0){
                            // if the number of clients has dropped to zero
                            // close the server
                            serverstate = false;
                            ServerIn.close();
                        }// if(clients == 0){...
                    }else{

                        // add a message to the message queue
                        outboundMessages.add(user + ": " + msg);

                    }//if..else...

                } catch (IOException e) {

                    System.out.println(e);

                }// try { ... } catch { ...}

        }// while

        }// public void run() { ...
    }

    public static class ServerThread extends Thread {

        //public variable declaration
        BufferedReader UserIn =
                new BufferedReader(new InputStreamReader(System.in));

        //OutputThread Class Constructor
        ServerThread() {

        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            try {

                //while loop will continue until
                //exit command is received
                //then send the exit command to all clients

                msg = UserIn.readLine();

                while (!msg.equals("EXITEXIT")) {

                    System.out.println("Enter Message: ");
                    msg = UserIn.readLine();

                }//while(...){

                outboundMessages.add(msg);
                serverstate = false;
                UserIn.close();

            } catch (IOException e) {
                System.out.println(e);

            }//try{...}catch{...}


        }//public void run(){...
    }// public serverThread(){...

}// public class prog4_server
+1  A: 

I have solved this problem in the past by defining a "MessageHandler" class per client connection, responsible for inbound / outbound message traffic. Internally the handler uses a BlockingQueue implementation onto which outbound messages are placed (by internal worker threads). The I/O sender thread continually attempts to read from the queue (blocking if required) and sends each message retrieved to the client.

Here's some skeleton example code (untested):

/**
 * Our Message definition.  A message is capable of writing itself to
 * a DataOutputStream.
 */
public interface Message {
  void writeTo(DataOutputStream daos) throws IOException;
}

/**
 * Handler definition.  The handler contains two threads: One for sending
 * and one for receiving messages.  It is initialised with an open socket.
 */    
public class MessageHandler {
  private final DataOutputStream daos;
  private final DataInputStream dais;
  private final Thread sender;
  private final Thread receiver;
  private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>();

  public MessageHandler(Socket skt) throws IOException {
    this.daos = new DataOutputStream(skt.getOutputStream());
    this.dais = new DataInputStream(skt.getInputStream());

    // Create sender and receiver threads responsible for performing the I/O.
    this.sender = new Thread(new Runnable() {
      public void run() {
        while (!Thread.interrupted()) {
          Message msg = outboundMessages.take(); // Will block until a message is available.

          try {
            msg.writeTo(daos);
          } catch(IOException ex) {
            // TODO: Handle exception
          }
        }
      }
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress()));

    this.receiver = new Thread(new Runnable() {
      public void run() {
        // TODO: Read from DataInputStream and create inbound message.
      }
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress()));

    sender.start();
    receiver.start();
  }

  /**
   * Submits a message to the outbound queue, ready for sending.
   */
  public void sendOutboundMessage(Message msg) {
    outboundMessages.add(msg);
  }

  public void destroy() {
    // TODO: Interrupt and join with threads.  Close streams and socket.
  }
}

Note that Nikolai is correct in that blocking I/O using 1 (or 2) threads per connection is not a scalable solution and typically applications might be written using Java NIO to get round this. However, in reality unless you're writing an enterprise server which thousands of clients connect to simultaneously then this isn't really an issue. Writing bug-free scalable applications using Java NIO is difficult and certainly not something I'd recommend.

Adamski
Thanks. I have never used a blocking queue before, but I can see how it would fit into my original design. I know it's not scalable but that's not truly in the scope of this assignment.What I'm seeing is that with the two threads I had originally I can use a global message variable and blocking queue in my output threads to send the message out on all the active client ports.I'm going to see if that will work. I am also needing to find a way to close both the threads associated with one socket.
Levi
Yes exactly - You can create one Message and then pass it to each handler. To shut down your threads you'll need to call thread.interrupt() followed by thread.join(). However, for this to work it's important that both threads regularly check their interrupted status via Thread.interrupted().
Adamski
My prior version exited based upon a user input. This is easy enough with the input thread, but I am stuck on seeing how to keep an association between that input thread and it's corresponding output thread.Thanks again for your help on this, I enjoy learning new and better way's to do things.
Levi
Good approach, +1.
Nikolai N Fetissov
@Levi: Re. your last point: There's no need to have a direct association between sender and receiver thread: If the receiver thread reads the string "EXIT" they could always call destroy() on the handler, which it will have a reference to. However, be warned that if you do this you shouldn't join() on the receiver thread within destroy() or you'll block forever; you may simply wish to interrupt the receiver thread causing it to exit from it's "receiving" loop and terminate.
Adamski
@Adamski - So check for interrupted in my while loop and exit when it is. I got rid of the threaded output. Instead now I just have a single thread that sends the message out to a list of clients whenever there is a message in the queue. That way there is only one thread for all outgoing messages. I'm sure I could use something similar for all the incoming messages, but I'm not sure if I want to go through all that trouble.
Levi
@Levi: The problem with blocking I/O is that you have no idea when reading or sending a message how long the thread will block for. Having one sender thread means that a slow client will hold up all other clients from receiving the message, which is why I recommend the 2-threads-per-connection approach for maximum responsiveness.
Adamski