Hello
I have a program which has multiple objects of type PublisherTask and SubscriberTask.
A given subscriber can be subscribed to one or more Publishers.
To describe my problem I will post some code....
abstract class Publication {
// some published information
}
class ConcretePublicationA extends Publication {
}
class ConcretePublicationB extends Publication {
}
abstract class Subscription {
private final long id;
private final Subscriber s;
// PLUS some other members relating to the subscription
protected Subscription(long id, Subscriber s){
this.id = id;
this.s =s;
}
public Subscriber getSubscriber() {
return this.s;
}
}
class ConcreteSubscriptionA extends Subscription {
protected ConcreteSubscriptionA(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
class ConcreteSubscriptionB extends Subscription {
protected ConcreteSubscriptionB(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
interface Subscriber {
public void update(Publication pub);
}
interface Publisher {
public Subscription subscribe(Subscriber subscriber);
}
abstract class PublisherTask implements Runnable, Publisher {
private final ConcurrentHashMap<Long, Subscription> subscribers =
new ConcurrentHashMap<Long, Subscription>();
Long subscriptionId = 0L;
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = /* new ConcretePublication(....) */ null;
for (Subscription s : subscribers.values()) {
s.getSubscriber().update(pub);
}
}
}
@Override
public Subscription subscribe(Subscriber subscriber) {
Subscription sub;
synchronized(subscriptionId) {
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
sub = new ConcreteSubscriptionA(++subscriptionId, subscriber);
subscribers.put(subscriptionId, sub);
}
return sub ;
}
}
abstract class SubscriberTask implements Runnable, Subscriber {
protected ConcurrentLinkedQueue<Publication> newPublications =
new ConcurrentLinkedQueue<Publication>();
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = newPublications.peek();
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
{
if (pub instanceof ConcretePublicationA) {
// Do something with the published data
} else if (pub instanceof ConcretePublicationB) {
// Do something with the published data
}
}
}
}
@Override
public void update(Publication pub) {
/* My question relates to this method:
* Bascially to avoid memory issues I would like existing
* unprocessed publications **Of Tth Same Type As The New One**
* to be discarded
*/
Publication existing = null;
do {
//This won't work coz peek() only looks at the head of the queue
existing = newPublications.peek();
if ((existing != null) && (existing.getClass().equals(pub))) {
newPublications.remove(existing);
}
} while (existing != null);
newPublications.add(pub);
}
Ok, so now that you've had a chance to scrutinize my code. I would like to ask the following question:
In the update method shown above, is it possible to peek at all the elements in ConcurrentLinkedQueue and remove those of a given type?
Also, please feel free to let me know if you think improvements can be made to the classes and how they interact with each other.
Thanks