views:

72

answers:

2

Motivation

I want extra eyes to confirm that I am able to call this method XMPPConnection.sendPacket( Packet ) concurrently. For my current code, I am invoking a List of Callables (max 3) in a serial fashion. Each Callable sends/receives XMPP packets on the one piece of XMPPConnection. I plan to parallelize these Callables by spinning off multiple threads & each Callable will invoke sendPacket on the shared XMPPConnection without synchronization.

XMPPConnection

class XMPPConnection
{
    private boolean connected = false;

    public boolean isConnected() 
    {
        return connected;
    }

    PacketWriter packetWriter;

    public void sendPacket( Packet packet ) 
    {
        if (!isConnected())
            throw new IllegalStateException("Not connected to server.");

        if (packet == null) 
            throw new NullPointerException("Packet is null.");

        packetWriter.sendPacket(packet);
    }
}

PacketWriter

class PacketWriter
{
    public void sendPacket(Packet packet) 
    {
        if (!done) {
            // Invoke interceptors for the new packet 
            // that is about to be sent. Interceptors
            // may modify the content of the packet.
            processInterceptors(packet);

            try {
                queue.put(packet);
            }
            catch (InterruptedException ie) {
                ie.printStackTrace();
                return;
            }
            synchronized (queue) {
                queue.notifyAll();
            }

            // Process packet writer listeners. Note that we're 
            // using the sending thread so it's expected that 
            // listeners are fast.
            processListeners(packet);
    }

    protected PacketWriter( XMPPConnection connection ) 
    {
        this.queue = new ArrayBlockingQueue<Packet>(500, true);
        this.connection = connection;
        init();
    }
}

What I conclude

Since the PacketWriter is using a BlockingQueue, there is no problem with my intention to invoke sendPacket from multiple threads. Am I correct ?

A: 

You haven't provided enough information here.

We don't know how the following are implemented:

  • processInterceptors
  • processListeners

Who reads / writes the 'done' variable? If one thread sets it to true, then all the other threads will silently fail.

From a quick glance, this doesn't look thread safe, but there's no way to tell for sure from what you've posted.

Other issues:

  • Why is PacketWriter a class member of XMPPConnectionwhen it's only used in one method?
  • Why does PacketWriter have a XMPPConnection member var and not use it?
Glen
A: 

You might consider using a BlockingQueue if you can restrict to Java 5+.

From the Java API docs, with a minor change to use ArrayBlockingQueue:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new ArrayBlockingQueue();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

For your usage you'd have your real sender (holder of the actual connection) be the Consumer, and packet preparers/senders be the producers.

An interesting additional thought is that you could use a PriorityBlockingQueue to allow flash override XMPP packets that are sent before any other waiting packets.

Also, Glen's points on the design are good points. You might want to take a look at the Smack API (http://www.igniterealtime.org/projects/smack/) rather than creating your own.

Bill Barnhill