views:

49

answers:

4

Hi,

I'm having trouble locking on an item within a Collection - specifically a ConcurrentDictionary.

I need to accept a message, look up that message within the Dictionary and then run a lengthy scan on that. As the program takes a lot of memory, after the scan the objects return true if they think its a good time to delete it (which I do by removing it from the Dictionary). However, another thread could come at a similar time and try to access that same object right after the delete. This is my first attempt:

string dictionaryKey = myMessage.someValue;

DictionaryObject currentObject = myConcurrentDictionary.GetOrAdd(dictionaryKey, new DictionaryObject());
// we can be interrupted here
lock (currentObject)
{
    //KeyNotFoundException is possible on line below
    if (myConcurrentDictionary[dictonaryKey].scan(myMessage)) // Scans the message - returns true if the object says its OK to remove it from the dictionary
    {
      DictionaryObject temp;                      //   It's OK to delete it
      if (!queuedMessages.TryRemove(ric, out temp))   // Did delete work?
       throw new Exception("Was unable to delete a DictionaryObject that just reported it was ok to delete it");
    }
}

However, the above doesn't work - it's possible for one thread to remove an object from the Dictionary right before another is going to attempt to access that object within the Dictionary. After reading that lock is shorthand for Monitor.Enter and Monitor.Exit, I tried this:

string dictionaryKey = myMessage.someValue;
Monitor.Enter(GetDictionaryLocker);
DictionaryObject currentObject = myConcurrentDictionary.GetOrAdd(dictionaryKey, new DictionaryObject());
// we can be interrupted here
lock (currentObject)
{
    Monitor.Exit(GetDictionaryLocker);
    //KeyNotFoundException is still possible on line below
    if (myConcurrentDictionary[dictonaryKey].scan(myMessage)) // Scans the message - returns true if the object says its OK to remove it from the dictionary
    {
      DictionaryObject temp;                   //   It's OK to delete it
      if (!queuedMessages.TryRemove(ric, out temp))   // Did delete work?
       throw new Exception("Was unable to delete a DictionaryObject that just reported it was ok to delete it");
    }
}

Both ways can result in a KeyNotFoundException when trying to look the object up within the Dictionary.

Does anyone know how I could find the object I want to lock and then lock it without being interrupted? Sorry - I'm new at concurrency and feel thoroughly confused!

Thanks,

Frederik

A: 

I think there is a fundamental misunderstanding of "locking" an object here.

When you hold a lock on an object (Monitor.Enter), it doesn't actually prevent other threads from using that object. It just prevents other threads from taking a lock on that specific object. What you need to do is add a secondary object, and lock on it - and make sure every thread locks on it as well.

That being said, this is going to introduce synchronization overhead. It might be worth trying to come up with a design where the lock is only used if there is a conflict - something like actually removing the object prior to scan, then locking, and holding the lock during the scan. Other threads can only try to obtain the lock if the object they are requesting is not part of the dictionary...

Reed Copsey
+2  A: 

You should remove the object from the dictionary before you start your scan, to prevent any other thread from trying to use it concurrently. You can always add it back in if you have to later on, after a failure in scan(). Both remove and add are guaranteed thread-safe on this concurrent collection.

This should make what you want possible without any locks or Monitor usage.

string dictionaryKey = myMessage.someValue;

DictionaryObject currentObject = null;
if (myConcurrentDictionary.TryRemove(dictionaryKey, out currentObject))
{
    //KeyNotFoundException is possible on line below
    if (!currentObject.scan(myMessage)) // Scans the message - returns true if the object says its OK to remove it from the dictionary
    {
      if (!myConcurrentDictionary.TryAdd(dictionaryKey, currentObject))
       throw new Exception("Was unable to re-insert a DictionaryObject that is not OK for deletion");
    }
} 

My concern with this, without understanding the rest of your code, is whether some other thread can add back in another message with the same key during your call to scan(). This will cause TryAdd to fail. If this is a possibility, more work is needed.

The problem with your current model is that even though the collection is thread-safe, what you really must do if you wish to leave the 'being scanned' items in the collection is to do the following combination of operations atomically: 1. find a free item and 2. mark it as 'in use'.

  1. can be done by virtue of the collection being thread-safe but
  2. has to be done separately, so you open up a window for multiple scan()s on the same object.
Steve Townsend
Thanks for the reply Steve. You're right - removing an item first sounds very elegant. I'm lost though as how I would tell other threads that the object is being processed and not to just create a new one. Any suggestions?
Frederik
@Frederik you can go @supercat's way
Nick Martyshchenko
@Frederik - I would make this a new question once you have gotten this area sorted out. There's not enough info here to answer that reliably. I noted my concern on this topic (after the code) in an EDIT to my response. It's possible that you could revert to a simple `Dictionary<>` and `lock()` it while you find a candidate object, and update a status field in the object to flag it as "being scanned". In that case, object status would have to be checked (also within `lock()` on `Dictionary<>`) by other possible consumers before handling in `scan()`. Here, you could leave the object in place.
Steve Townsend
@supercat's response also shows a possible way to check object status without abandoning `ConcurrentDictionary`. Note that this might result in you repeatedly trying and failing on an in-use object though.
Steve Townsend
Steve, I'm still a little confused but thanks for the great advice - you've given me a lot of help. Thanks to everyone else too.
Frederik
@Frederik - come back with more qs as you progress.
Steve Townsend
A: 

This is speculation ...

The problem is that the concurrentDictionary returns Unique objects to each thread to prevent them from interfering with each other.

Any add, change or remove methods will make sure that the correct item is accessed but the lock will only lock you local wrapper.

Any solution depends on what you can do with the dictionary.

Could the thread pull the message from the dictionary directly and only replace it after scan, or could you use a master thread that goes over the dictionary and adds all messages to a queue object that the threads pull from and after processing remove documents that should be removed.

That way each thread will never contend for the same message.

David Mårtensson
Thanks for that - good suggestions. The removing the item first strategy you, Reed and supercat mentioned sounds elegant, but I'm unsure how I would deal with another thread looking up that item in the Dictionary and not finding it - it would then want to create a new copy of the object - yet another one has "checked it out" and it should wait for the other thread. Does that make any sense?
Frederik
@Frederik, why other thread comes with similar `message`. If message has to be processed, so how it must be stacked with previous? How you resolve collisions, i.e. when message processing is started but another similar message is arrived? How processing modifies state of `DictionaryObject`
Nick Martyshchenko
Maybe the other option then, use a queue to shuffle messages to the worker threads. That way each thread will get separat messages, no collisions and the dictionary will still contain all messages until they are identified as to be removed.
David Mårtensson
A: 

How about including a field in the object which indicates what thread owns it, and using Threading.Interlocked.CompareExchange to attempt to acquire it? Figure that if an object is in use, code shouldn't lock but simply abandon the operation.

supercat
I wasn't familiar with Threading.Interlocked.CompareExchange before - thanks. Unfortunately abandoning the operations isn't an option - because messages must arrive in their correct order.
Frederik
@Frederik, can you describe in your question what processing logic you'll need to implement? Maybe you'll implement `Queue<>` as `ConcurrentDictionary<>` value and gather processing objects there. Or your `DictionaryObject` has to be shared?
Nick Martyshchenko
If you want to process them in order take a look at `BlockingCollection<T>` - http://msdn.microsoft.com/en-us/library/dd267312.aspx. When serviced with one thread, this should guarantee ordering, at least insofar as they are added to the queue in the correct order.
Steve Townsend
@Frederik: So what you need is to ensure that if an item is being scanned when someone else wants to add to it, either the item will be deleted and a new one created, or the item will be appended to and not deleted. Best would probably be to use a combination of CompareExchange and locking; you need to use lock each item when actually deleting it, and use CompareExchange to keep track of whether an item is being deleted, and any attempt has been or is being made to add to an item during a scan. If an attempt is made to add to an item during deletion, wait on the old item's lock then try again.
supercat