views:

79

answers:

2

Hello
I have a program which has multiple objects of type PublisherTask and SubscriberTask. A given subscriber can be subscribed to one or more Publishers.
To describe my problem I will post some code....

abstract class Publication {
    // some published information
}

class ConcretePublicationA extends Publication {

}

class ConcretePublicationB extends Publication {

}

abstract class Subscription {
    private final long id;
    private final Subscriber s;
    // PLUS some other members relating to the subscription

    protected Subscription(long id, Subscriber s){
        this.id = id;
        this.s =s;
    }

    public Subscriber getSubscriber() {
        return this.s;
    }


}

class ConcreteSubscriptionA extends Subscription {

    protected ConcreteSubscriptionA(long id, Subscriber s) {
        super(id, s);
        // TODO Auto-generated constructor stub
    }

}

class ConcreteSubscriptionB extends Subscription {

    protected ConcreteSubscriptionB(long id, Subscriber s) {
        super(id, s);
        // TODO Auto-generated constructor stub
    }

}

interface Subscriber {
    public void update(Publication pub);
}

interface Publisher {
    public Subscription subscribe(Subscriber subscriber);
}

abstract class PublisherTask implements Runnable, Publisher {
    private final ConcurrentHashMap<Long, Subscription> subscribers =
        new ConcurrentHashMap<Long, Subscription>();
    Long subscriptionId = 0L;

    @Override
    public void run() {
        /*obviously this is a different variable in a real program*/
        boolean some_condition = true;

        while(some_condition) {
            // do some work
            Publication pub = /* new ConcretePublication(....) */ null;

            for (Subscription s : subscribers.values()) {
                s.getSubscriber().update(pub);
            }
        }

    }

    @Override
    public Subscription subscribe(Subscriber subscriber) {
        Subscription sub;

        synchronized(subscriptionId) {
            /* the lines below are in a function in the sub-class,
             *  but for brevity I'm showing them here
             */
            sub = new ConcreteSubscriptionA(++subscriptionId, subscriber);
                    subscribers.put(subscriptionId, sub);
        }
        return sub ;
    }

}


abstract class SubscriberTask implements Runnable, Subscriber {

    protected ConcurrentLinkedQueue<Publication> newPublications =
        new ConcurrentLinkedQueue<Publication>();

    @Override
    public void run() {
        /*obviously this is a different variable in a real program*/
        boolean some_condition = true;

        while(some_condition) {
            // do some work
            Publication pub = newPublications.peek();

        /* the lines below are in a function in the sub-class,
         *  but for brevity I'm showing them here
         */
        {
            if (pub instanceof ConcretePublicationA) {
                // Do something with the published data
            } else if (pub instanceof ConcretePublicationB) {
                // Do something with the published data
            }
        }
        }
    }

    @Override
    public void update(Publication pub) {

        /* My question relates to this method:
             * Bascially to avoid memory issues I would like existing
             * unprocessed publications **Of Tth Same Type As The New One**
             * to be discarded
             */
        Publication existing = null;

        do {
            //This won't work coz peek() only looks at the head of the queue
                    existing = newPublications.peek();

            if ((existing != null) && (existing.getClass().equals(pub))) {
                newPublications.remove(existing);
            }
        } while (existing != null);
        newPublications.add(pub);
    }

Ok, so now that you've had a chance to scrutinize my code. I would like to ask the following question:
In the update method shown above, is it possible to peek at all the elements in ConcurrentLinkedQueue and remove those of a given type?
Also, please feel free to let me know if you think improvements can be made to the classes and how they interact with each other.
Thanks

A: 

I've solved this problem (or similar) before. The way that I solved it was to effectively embed a counter in every new Publication object (you would need a counter per class type for what you are doing). When you read the publication objects off the queue if the counter within the consumed object is less than the current value then discard since there is a later one in the queue.

However, you must be careful with this that you are not going to starve your publications of cpu, since if you keep adding new publication objects fast enough, you will never actually process any of them since they will all be marked as stale (I knew I was adding very slowly, like one every 10 minutes, and my tasks took a couple of minutes each to run, with old tasks being of no use if a new one was queued up behind it).

Your solution looks like if you add A1,A2,B1,A3,A4,B2 then when you add B2 you will not remove B1 since A1 is at the head of the queue and is not of the same type, which doesn't seem like what your comment implies you want.

You could implement my idea by putting some helper code in like:

class ValidCounter  {
    private Map<Class, AtomicLong> counters = new ConcurrentHashMap<Class, AtomicLong>();
    public int getAtomicLongFor(Class clz) {
        AtomicLong ans = counters.get(clz);
        if ( ans == null ) {
            counters.put(clz, new AtomicLong(0));
            return 0;
        }
        return ans.get();
    }
}

To use when add you would do in constructor

long myValidCounter = validCounterInstance.getAtomicLongFor(getClass()).incrementAndGet();

To use when checking if should still process you could use

long currValidCounter = validCounterInstance.getAtomicLongFor(getClass()).get();
if ( pub.getValidCounter() >= currValidCounter ) {
    // still valid so process
} else {
    // superceeded, so ignore
}

Remember the starvation issue though - you might want to add something that cares how many or how long you've been discarding stuff and just takes an old one anyway.

I would also suggest you don't use peek at all and just use poll or you are very likely to get threading issues (items processed multiple times) if you have multiple consumers.

DaveC
A: 

Yes, it's possible to peek at all the elements of a ConcurrentLinkedQueue using an iterator.

Iterator<Publication> itr = newPublications.iterator();
while (itr.hasNext()) {
   Publication existing = itr.next();
   if (existing.getClass().equals(pub)) {
      itr.remove();
   }
}

Because the iterator returned by ConcurrentLinkedQueue guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction, you may want to externally lock around:

newPublications

In general though this doesn't seem very efficient so an alternative solution altogether to prune duplicate Publications should be investigated.

jenglert