views:

219

answers:

6

I have an archive object which manages various byte arrays and hands out InputStreams and OutputStreams for reading and writing them. Each byte array has an associated ReentrantReadWriteLock.

The constructor of the subclass of InputStream which the archive produces acquires the lock for the relevant data, while close() releases the lock. Now, the problem:

Suppose that I have a task which will run on another thread which needs input from the archive, is given the InputStream as an argument before it starts, and is responsible for closing the stream when it's finished. These requirements appear to be incompatible with the locking mechanism used by my data archive, because

  1. The InputStream is created on a different thread from the one on which it is closed.
  2. A ReentrantReadWriteLock must be released by the thread which owns it.

Rewriting the parts of the program which expect an InputStream as input in order to avoid #1 is inconvenient and makes those parts less flexible. Using a kind of lock which permits change of ownership would let me avoid #2, but I don't see any locks in the Java API which can handle that and I'm not terribly keen on knocking together a custom lock if I don't have to.

What solutions are there to this problem?

+2  A: 

The cleanest way to do this is to give the clients something other than a subclass of InputStream, even if this means slightly more refactoring. In your current design, you can't count on your clients releasing the lock, what if one of them does not? This is not good encapsulation.

Have the archive object manage the locks, and provide an easy API for your clients to get the data. Even if this means changing some APIs.

The problem of different threads locking and releasing the same object is a clear sign that you got something slightly wrong in your current design. Work on the refactoring aforementioned, it will do you a great deal further on.

Yuval A
I'm not worried about streams not being closed. The archive is wholly internal, not part of an outward-facing API. The number of places we create these streams is relatively small, and they're always closed inside finally blocks---but those places might be on a thread other than the one where the stream was created, and hence different from where the lock was acquired.
uckelman
Well, you still have a motivation as to why you should handle the locking in the archive object... I doubt there is a cleaner way to achieve this.
Yuval A
Reluctantly, this is the solution I implemented. I still feel that the most elegant and straightforward way to handle this would be to have a reentrant lock for which the owner may be set to null, which would indicate that any thread may release it. Unfortunately, the method for setting the lock owner is buried in a private inner class of `ReentrantReadWriteLock`.
uckelman
A: 

Perhaps you could explain a little more about how your locking works. For the InputStream, why does the byte array need to have a lock associated with it and why must the constructor instantiate it?

Kevin
The reason for the locks is the normal one, to permit unlimited concurrent reads and to make writing exclusive. The reason for having the InputStream subclass' ctor acquire the lock is that the locking should be invisible to stuff outside the archive and its helper classes, and the lock has to be acquired someplace---either there or in the method which returns the InputStream. Either way, it's still the same problem, because both that method and the ctor are being called on one thread and the stream is being closed on another.
uckelman
Is your "archive object" handing out references to the same InputStream/OutputStream to different threads?
matt b
@matt: Each stream returned is unique.
uckelman
+2  A: 

Based on your response to my question. You could simply postpone construction (and locking) of the InputStream by passing a custom class used to manufacture the InputStream to the ThreadExecutorService. Then the thread in the service requests the InputStream from this object which first constructs it (since it doesn't exist) and then you're all set.

For instance:

public interface InputStreamMaker {
    public InputStream getOrCreateInputStream();
}

Then in your main class that starts everything:

static class InputStreamMakerImpl implements InputStreamMaker {
    private final Manager manager; // Your manager class (or use a normal Inner class on manager)
    private InputStream is;

    // other variables needed to define how to create input stream for the particular task here

    InputStreamMakerImpl(Manager manager) {
        this.manager = manager;
    }

    public InputStream getOrCreateInputStream() {
         if (is == null) {
             // pass this object - so manager will have all the details to create the right stream
             is = manager.createNewStream(this);
         }

         return is;
    }
}

and then on the main thread...

...some method...
InputStreamMakerImpl maker = new InputStreamMakerImpl(manager /* or this */);
// set values needed in maker for your manager class to create the right input stream
// start the worker and pass the InputStreamMaker (maker) as the parameter instead of the InputStream

This inner class can have additional information as necessary to create the right stream for the worker. Now the InputStream is created/opened on the right thread and that thread also can close it and unlock the lock.

Edit: On rereading your question I see that you may not want to change any of the Worker objects that already take an InputStream. Then you can still do this by:

  1. Creating a custom sub-class of InputStream that holds all the values of what I define above as InputStreamMakerImpl.

  2. The custom InputStream has an internal method getInputStream() which does what I define for getOrCreateInputStream() above.

  3. Then all the methods for InputStream simply delegate the calls, for example:

    public int read() throws IOException {
        return getInputStream().read();
    }
    
Kevin Brock
The problem I see here is that once you've used a stream on some thread, then that thread owns it.
uckelman
+1  A: 

java.io.InputStream has total nine methods. You have already overridden one i.e. close() method.

I would recommend you to have lazy locking, i.e. don't aggressively acquire lock in constructor. For this make ReentrantReadWriteLock available in your InputStream sub class and have a private method called doAcquireLock() that attempts to acquire the lock. Then override all the other eight methods of InputStream such that in all the overridden methods first make a call to doAcquireLock() and then delegate the operation to super.

example:

public int read() 
    doAcquireLock(); // This method get's the lock
    return super.read();
}

With this lock will be acquired by the thread that first calls any of the above eight methods and will be responsible to close as well.

Just to make sure you don't end up in deadlock situation you need to look out is to make sure client makes call to close method at the end so as to release the lock. Also none of the methods should be called after close method is called by having a boolean indicator to know if stream is already closed (stale).

Gladwin Burboz
What happens if client thread does not invoke close? You really got to be careful about that.
Gladwin Burboz
If your client thread doesn't call close(), it's just broken. What if your client thread dereferences a null, or calls System.exit()?
uckelman
Calling close() is client responsibility. If client has acquired resource then it is client's responsibility to release it. System.exit()? You whole program closes so no locks at all.
Gladwin Burboz
You may want to use some ideas from JDBC connection pooling for returning unused streams back. http://download-uk.oracle.com/docs/cd/B28359_01/java.111/e10788/optimize.htm#CFHHCICE
Gladwin Burboz
This looks like a reasonable solution to me, so long as checking whether we have the lock is cheap enough.
uckelman
A: 

Solution#2:

If you don't need to keep holding on to lock until end of thread, you can implement your custom InputStream as below which you can give to your clients. Now whenever any thread needs to read, it will acquire lock, then read and finally automatically relinquish the lock as soon as read is complete. Your custom OutputStream should be similarly implemented with write lock.

.

public class LockingInputStream extends InputStream {
    private final InputStream byteArrayInputStream;
    private final Lock r;

    public LockingInputStream(
                    InputStream byteArrayInputStream, 
                    ReentrantReadWriteLock rwl) {
        this.byteArrayInputStream = byteArrayInputStream;
        this.r = rwl.readLock();
    }

    @Override
    public int read() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.read();
        } finally { r.unlock(); }
    }
    @Override
    public int available() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.available();
        } finally { r.unlock(); }
    }
    ....
    @Override
    public void close() throws IOException {
        r.lock();
        try {
            byteArrayInputStream.close();
        } finally { r.unlock(); }
    }
}
Gladwin Burboz
The downside of this is that another thread could get between two of your reads and modify the data you're reading.
uckelman
A: 

Another solution I stumbled upon while mulling this over: One way of viewing the problem is that it's caused by the reentrancy of the locks. ReentrantReadWriteLock keeps track of lock ownership in order to handle reentrancy. If I want to acquire locks on stream creation and release locks on stream closure, and close streams on threads other than the ones which on which they were created, then a reentrant lock won't work because the wrong thread will own the stream lock.

One way around this is to use a counting lock, but force all the locking outwards. First, counting lock: States are integers -1,0,1,...., where -1 means writing, 0 means unlocked, and positive n count the number of concurrent reads. This way, writes are still exclusive, but locks aren't thread specific. Forcing all locking outwards: We have methods like exists() where we need to acquire a read lock, but exists() is called by some methods which will already have a write lock. So, we split out the guts of exists() into a protected existsImpl(), which should be called only when a lock is already held (that is, internally, by other *Impl() methods. This way, once a lock is acquired, only the nonlocking Impl methods are called, and we avoid the need for reentrancy.

This is all well and good, except that it ends up being wickedly complex to ensure that you're not screwing up in implementing it---calling a public locked method when you should be calling a protected non-locked one---and thereby creating a deadlock. Secondly, forcing all of your locks to the outside means that you're holding onto the locks longer, so this is (probably) not good for concurrency.

So, this is something I tried, but decided against using.

uckelman