views:

97

answers:

3

I have following multi threaded environment scenario - Requests are coming to a method and I want to avoid the duplicate processing of concurrent requests coming. As multiple similar requests might be waiting for being processed in blocked state. I used hashtable to keep track of processed request, but it will create memory leaks, so how should keep track of processed request and avoid the same requests to be processed which may be in blocking state.

How to check, that any waiting/blocked incoming request is not the one which are processed in current threads.

A: 

If the memory leak is the problem have a look at WeakHashMap to keep your request during processing.

Another solution would be to use a memory bound cache...

pgras
I thought of this, but is there any way of doing this without using any data structure. by structuring lock checks and condition checks in some manner
seawaves
Also we never know when the entry of the weakhashmap would be removed, if it got removed before all blocked requests are processed, then duplicate request processing will happen.
seawaves
A: 

Okay, I think I kinda understand what you want.

You can use a ConcurrentSkipListSet as a queue. Implement your queued elements like this:

 class Element implements Comparable<Element> {
      //To FIFOnize
      private static final AtomicLong SEQ = new AtomicLong();
      private final long id = SEQ.incrementAndGet();

      //Can only be executed once.
      private final Semaphore execPermission = new Semaphore(1);


      public int compareTo(Element e){
            // If element e1 exists on the queue such that 
            // e.compareTo(e1) == 0, that  element will not
            // be placed on the queue.
            if(this.equals(e)){
               return 0;
            }else{
               //This will enforce FIFO.
               this.id > e.id ? 1 : ( this.id < e.id ? -1 : 0);
            }
      }
      //implement both equals and hashCode

      public boolean tryAcquire(){
          return execPermission.tryAcquire();
      }
 }

Now your threads should,

 while(!Thread.currentThread().isInterrupted()){
     //Iterates from head, therefore simulates FIFO
     for(Element e : queue){
          if(e.tryAcquire()){
               execute(e); //synchronous
               queue.remove(e);
          }
     }
 }

You can also use a blocking variant of this solution (have a bounded SortedSet and let worker threads block if there are no elements etc).

Enno Shioji
A: 

There is no inherent reason why keeping track of requests in a HashMap (or any other way you might choose) would lead to memory leaks. All that's needed is a way for entries to be removed once they have been processed.

This could mean having your request processing threads:

  • directly remove the entry;
  • communicate back to the dispatcher; or
  • mark the request as processed, so that the dispatcher can remove the entries.
CPerkins
At what condition, I should clean the hashmap, how will I know that no waiting thread is there which has same request as the processed ones.
seawaves
@seawaves : When there is no waiting thread with the same request. When dispatcher gets a request for which there's already a thread running, you're going to make the thread which would process it wait, right? So your dispatcher already has a set of requests, a set of running threads, and a set of waiting threads. *(lowercase "set" - general math term, not any specific data structure)*.So your dispatcher, when it discovers that the running thread for request X is done, checks to see if any other threads are waiting to work on X, and either removes it or starts one of the waiters.
CPerkins