views:

1103

answers:

2

I am writing a Java multi-threaded network application and having real difficulty coming up with a way to unit test the object which sends and receives communication from network clients.

The object sends out a message to a number of clients and then waits for responses from the clients.

As each client responds, a dashboard-style GUI is updated.

In more detail...

A Message object represents a text message to be sent and contains an array of Clients which should receive the message.

The Message object is responsible for dispatching itself to all the appropriate clients.

When the dispatch() method is invoked on a Message object, the object spawns a new thread (MessageDispatcher) for each client in the Client array.

Each MessageDispatcher:

  • opens a new TCP socket (Socket) to the client

  • delivers the message to its client... PrintWriter out.println(msg text)

  • creates a 'Status' object which is passed to a Queue in the Message object and then on to the GUI.

Each Status object represents ONE of the following events:

  • Message passed to Socket (via Printwriter out.println() )

  • Display receipt received from client (via BufferedReader/InputStreamReader in.readline()... blocks until network input is received )

  • User acknowledge receipt received from client (via same method as above)

So.. I want to unit test the Message object. (using JUnit)

The unit test is called MessageTest.java (included below).

My first step has been to set up a Message object with a single recipient.

I then used JMockit to create a mock Socket object which can supply a mock OutputStream object (I am using ByteArrayOutputStream which extends OutputStream) to PrintWriter.

Then, when the MessageDispatcher calls (PrintWriter object).out, the message text will be ideally passed to my mock Socket object (via the mock OutputStream) which can check that the message text is OK.

And the sample principle for the InputStreamReader.... The mock Socket object also supplies a mock InputStreamReader object which supplies a mock BufferedReader which is called by the MessageDispatcher (as mentioned previously, MessageDispatcher blocks on in.readLine() ). At this point the mock BufferedReader should supply a fake confirmation to the MessageDispatcher...

// mock Socket
Mockit.redefineMethods(Socket.class, new Object()
{

    ByteArrayOutputStream output = new ByteArrayOutputStream();
    ByteArrayInputStream input = new ByteArrayInputStream();

    public OutputStream getOutputStream()
    {
        return output;
    }

    public InputStream getInputStream()
    {
        return input;
    }

});

If this wasn't multi-threaded, this should all work OK. However I have no idea how to do this with multiple threads. Can anyone give me any advice or tips?

Also if you have any input on the design (eg. Message object responsible for its own delivery rather than a separate delivery object.. "dependency injection"-style / separate thread for each client delivery) then I would be interested to hear that too.

UPDATE: here is the code:

Message.java

public class Message {

    Client[] to;

    String contents;

    String status;

    StatusListener listener;

    BlockingQueue<Status> statusQ;

    public Message(Client[] to, String contents, StatusListener listener) 
    {
     this.to = to;
     this.contents = contents;
     this.listener = listener;
    }

    public void dispatch()
    {
     try {

      // open a new thread for each client

      // keep a linked list of socket references so that all threads can be closed
      List<Socket> sockets = Collections.synchronizedList(new ArrayList<Socket>());

      // initialise the statusQ for threads to report message status
      statusQ = new ArrayBlockingQueue<Status>(to.length*3); // max 3 status objects per thread

      // dispatch to each client individually and wait for confirmation
      for (int i=0; i < to.length; i++) {

      System.out.println("Started new thread");

      (new Thread(new MessageDispatcher(to[i], contents, sockets, statusQ))).start();

      }

      // now, monitor queue and empty the queue as it fills up.. (consumer)
      while (true) {
       listener.updateStatus(statusQ.take());
      }
     }

     catch (Exception e) { e.printStackTrace(); }

    }

    // one MessageDispatcher per client
    private class MessageDispatcher implements Runnable
    {

     private Client client;
     private String contents;
     private List<Socket> sockets;
     private BlockingQueue<Status> statusQ;

     public MessageDispatcher(Client client, String contents, List<Socket> sockets, BlockingQueue<Status> statusQ) {

      this.contents = contents;

      this.client = client;

      this.sockets = sockets;

      this.statusQ = statusQ;

     }

     public void run() {

     try {

      // open socket to client
      Socket sk = new Socket(client.getAddress(), CLIENTPORT);

      // add reference to socket to list
      synchronized(sockets) {
       sockets.add(sk);
      }

      PrintWriter out = new PrintWriter(sk.getOutputStream(), true);

      BufferedReader in = new BufferedReader(new InputStreamReader(sk.getInputStream()));

      // send message
      out.println(contents);

      // confirm dispatch
      statusQ.add(new Status(client, "DISPATCHED"));

      // wait for display receipt
      in.readLine();

      statusQ.add(new Status(client, "DISPLAYED"));

      // wait for read receipt
      in.readLine();

      statusQ.add(new Status(client, "READ"));

      }

      catch (Exception e) { e.printStackTrace(); }
     }

    }

}

.... and the corresponding unit test:

MessageTest.java

public class MessageTest extends TestCase {

    Message msg;

    static final String testContents = "hello there";

    public void setUp() {

     // mock Socket
     Mockit.redefineMethods(Socket.class, new Object()
     {

      ByteArrayOutputStream output = new ByteArrayOutputStream();
      ByteArrayInputStream input = new ByteArrayInputStream();

      public OutputStream getOutputStream()
      {
       return output;
      }

      public InputStream getInputStream()
      {
       return input;
      }


     });

     // NB
     // some code removed here for simplicity
     // which uses JMockit to overrides the Client object and give it a fake hostname and address

     Client[] testClient = { new Client() };

     msg = new Message(testClient, testContents, this);

    }

    public void tearDown() {
    }

    public void testDispatch() {

     // dispatch to client
     msg.dispatch();


    } 
}
+1  A: 

perhaps instead of redefining the methods getOutputStream and getInputStream, you can instead use an AbstractFactory in your Message class which creates output and input streams. In normal operation the factory will use a Socket to do that. However, for testing give it a factory which gives it streams of your choosing. That way you have more control over exactly what is happening.

tster
+1 abstract factory pattern ftw!
David Berger
You're right, some kind of factory could be used in the Message class, but at the cost of making it more complex.Since the Message class can be unit tested cleanly with a mocking tool, I would rather do that than add extra complexity to the production code.
Rogerio
This is my thinking... I would rather use JMockit that pass a factory to Message.... having said that, I am open to both ways and have refactored a copy of Message to this end (and so far it works). However I am searching for a less-involved method..
Imme22009
+1  A: 

Notice that the sending of multiple messages (multicast) can be achieved in a single blocking method through the NIO API (java.nio) as well, without the creation of new threads. NIO is quite complex, though.

I would start by writing the tests first, with a test-defined StatusListener implementation which stores all update events in a list. When the dispatch() method returns, the test can execute asserts on the state of the event list.

Using threads or NIO is an implementation detail for the Message class. So, unless you don't mind coupling the tests to this implementation detail, I would recommend introducing a helper class that would be responsible for sending multiple asynchronous messages and notifying the Message object upon any asynchronous replies. Then, you can mock the helper class in the unit tests, without coupling them to either threads or NIO.

I successfully implemented a test for the case of sending a message to one client. I also made some changes to the original production code, as follows:

public class Message
{
   private static final int CLIENT_PORT = 8000;

   // Externally provided:
   private final Client[] to;
   private final String contents;
   private final StatusListener listener;

   // Internal state:
   private final List<Socket> clientConnections;
   private final BlockingQueue<Status> statusQueue;

   public Message(Client[] to, String contents, StatusListener listener)
   {
      this.to = to;
      this.contents = contents;
      this.listener = listener;

      // Keep a list of socket references so that all threads can be closed:
      clientConnections = Collections.synchronizedList(new ArrayList<Socket>());

      // Initialise the statusQ for threads to report message status:
      statusQueue = new ArrayBlockingQueue<Status>(to.length * 3);
   }

   public void dispatch()
   {
      // Dispatch to each client individually and wait for confirmation:
      sendContentsToEachClientAsynchronously();

      Status statusChangeReceived;

      do {
         try {
            // Now, monitor queue and empty the queue as it fills up (consumer):
            statusChangeReceived = statusQueue.take();
         }
         catch (InterruptedException ignore) {
            break;
         }
      }
      while (listener.updateStatus(statusChangeReceived));

      closeRemainingClientConnections();
   }

   private void closeRemainingClientConnections()
   {
      for (Socket connection : clientConnections) {
         try {
            connection.close();
         }
         catch (IOException ignore) {
            // OK
         }
      }

      clientConnections.clear();
   }

   private void sendContentsToEachClientAsynchronously()
   {
      for (Client client : to) {
         System.out.println("Started new thread");
         new Thread(new MessageDispatcher(client)).start();
      }
   }

   // One MessageDispatcher per client.
   private final class MessageDispatcher implements Runnable
   {
      private final Client client;

      MessageDispatcher(Client client) { this.client = client; }

      public void run()
      {
         try {
            communicateWithClient();
         }
         catch (IOException e) {
            throw new RuntimeException(e);
         }
      }

      private void communicateWithClient() throws IOException
      {
         // Open connection to client:
         Socket connection = new Socket(client.getAddress(), CLIENT_PORT);

         try {
            // Add client connection to synchronized list:
            clientConnections.add(connection);

            sendMessage(connection.getOutputStream());
            readRequiredReceipts(connection.getInputStream());
         }
         finally {
            connection.close();
         }
      }

      // Send message and confirm dispatch.
      private void sendMessage(OutputStream output)
      {
         PrintWriter out = new PrintWriter(output, true);

         out.println(contents);
         statusQueue.add(new Status(client, "DISPATCHED"));
      }

      private void readRequiredReceipts(InputStream input) throws IOException
      {
         BufferedReader in = new BufferedReader(new InputStreamReader(input));

         // Wait for display receipt:
         in.readLine();
         statusQueue.add(new Status(client, "DISPLAYED"));

         // Wait for read receipt:
         in.readLine();
         statusQueue.add(new Status(client, "READ"));
      }
   }
}
public final class MessageTest extends JMockitTest
{
   static final String testContents = "hello there";
   static final String[] expectedEvents = {"DISPATCHED", "DISPLAYED", "READ"};

   @Test
   public void testSendMessageToSingleClient()
   {
      final Client theClient = new Client("client1");
      Client[] testClient = {theClient};

      new MockUp<Socket>()
      {
         @Mock(invocations = 1)
         void $init(String host, int port)
         {
            assertEquals(theClient.getAddress(), host);
            assertTrue(port > 0);
         }

         @Mock(invocations = 1)
         public OutputStream getOutputStream() { return new ByteArrayOutputStream(); }

         @Mock(invocations = 1)
         public InputStream getInputStream()
         {
            return new ByteArrayInputStream("reply1\nreply2\n".getBytes());
         }

         @Mock(minInvocations = 1) void close() {}
      };

      StatusListener listener = new MockUp<StatusListener>()
      {
         int eventIndex;

         @Mock(invocations = 3)
         boolean updateStatus(Status status)
         {
            assertSame(theClient, status.getClient());
            assertEquals(expectedEvents[eventIndex++], status.getEvent());
            return eventIndex < expectedEvents.length;
         }
      }.getMockInstance();

      new Message(testClient, testContents, listener).dispatch();
   }
}

The JMockit test above uses the new MockUp class, not yet available in the latest release. It can be replaced with Mockit.setUpMock(Socket.class, new Object() { ... }), though.

Rogerio
just to clarify... do you mean creating a class in between the threads/NIO and the message object? (ie. removing the resposibility of the 'implementation details' as you say from the message object)... If so, isn't this against the principle of encapsulation (putting all the methods to deal with an objects data inside the object) and dependency injection? (ie. not very good from an OO design point of view)
Imme22009
Yes, IF you consider important to encapsulate those implementation details; it isn't necessary, of course.Notice that creating such a helper class would be following the OO principle of "encapsulating what varies"; in this case the "thing that varies" is the way in which the asynchronous I/O is dealt with (ie, with blocking or non-blocking I/O operations).Dependency injection is not relevant here, the way I see it; why should it be, given that there are no externally configurable services involved?
Rogerio