views:

73

answers:

4

I would like to store tuples objects in a concurent java collection and then have an efficient, blocking query method that returns the first element matching a pattern. If no such element is available, it would block until such element is present.

For instance if I have a class:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

And a collection like:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

I would like to query it like:

myFunkyCollection.get( new Pair( null, "foo" ) );

which returns the first available pair with the second field equalling "foo" or blocks until such element is added. Another query example:

myFunkyCollection.get( new Pair( null, null ) );

should return the first available pair whatever its values.

Does a solution already exists ? If it is not the case, what do you suggest to implement the get( Pair p ) method ?

Clarification: The method get( Pair p) must also remove the element. The name choice was not very smart. A better name would be take( ... ).

+2  A: 

In your FunkyCollection add method you could call notifyAll on the collection itself every time you add an element.

In the get method, if the underlying container (Any suitable conatiner is fine) doesn't contain the value you need, wait on the FunkyCollection. When the wait is notified, check to see if the underlying container contains the result you need. If it does, return the value, otherwise, wait again.

Robert Christie
+3  A: 

Here's some source code. It basically the same as what cb160 said, but having the source code might help to clear up any questions you may still have. In particular the methods on the FunkyCollection must be synchronized.

As meriton pointed out, the get method performs an O(n) scan for every blocked get every time a new object is added. It also performs an O(n) operation to remove objects. This could be improved by using a data structure similar to a linked list where you can keep an iterator to the last item checked. I haven't provided source code for this optimization, but it shouldn't be too difficult to implement if you need the extra performance.

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

Output:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

Note that I put everything into one source file to make it easier to run.

Mark Byers
I see that you remove an entry matched by `get()`. The specification didn't call for that, but it's an interesting twist. It makes searching the whole collection again after an unsatisfied `get()` necessary (or at least very difficult to avoid), as one never knows then how many items may have been removed by other threads while the current thread was waiting on an addition.
seh
Good point, I'll clarify that with the original poster. The solution could be simpler (and perform better) if get doesn't remove elements.
Mark Byers
I think you need to revisit the decision to catch the interrupt exception (given the signatures on the higher level methods).
@alphazero: The run method must not throw any exceptions. What change do you have in mind?
Mark Byers
Removing elements in the middle of an ArrayList is an O(n) operation. Moveover, whenever an element is inserted, every blocking consumer will scan the entire list, which is yet another O(n) operation. I would not call that efficient.
meriton
I've added an explanation regarding efficiency. There's nothing preventing this method from being made much faster, it's just more code than I'd want to post in this format and the simple solution may be good enough for what he needs. I do think it deserves an explanation note of how to improve the performance though... so I've added one. Thanks for the comment.
Mark Byers
+3  A: 

I know of no existing container that will provide this behavior. One problem you face is the case where no existing entry matches the query. In that case, you'll have to wait for new entries to arrive, and those new entries are supposed to arrive at the tail of the sequence. Given that you're blocking, you don't want to have to examine all the entries that precede the latest addition, because you've already inspected them and determined that they don't match. Hence, you need some way to record your current position, and be able to search forward from there whenever a new entry arrives.

This waiting is a job for a Condition. As suggested in cb160's answer, you should allocate a Condition instance inside your collection, and block on it via Condition#await(). You should also expose a companion overload to your get() method to allow timed waiting:

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

Upon each call to add(), call on Condition#signalAll() to unblock the threads waiting on unsatisfied get() queries, allowing them to scan the recent additions.

You haven't mentioned how or if items are ever removed from this container. If the container only grows, that simplifies how threads can scan its contents without worrying about contention from other threads mutating the container. Each thread can begin its query with confidence as to the minimum number of entries available to inspect. However, if you allow removal of items, there are many more challenges to confront.

seh
The poster updated the question. As I guessed, the container does not only grow. The get method should also remove the element.
Mark Byers
+1  A: 

It appears you are looking for an implementation of Tuple Spaces. The Wikipedia article about them lists a few implementations for Java, perhaps you can use one of those. Failing that, you might find an open source implementation to imitate, or relevant research papers.

meriton
I agree. But the libraries I have seen are distributed implementations with several refinements. However I just need the above feature in a shared memory context.
paradigmatic