tags:

views:

234

answers:

1

Hi there,

I need a mechanism to implement the following scenario:

  1. two or more threads need to load a given set of values at the same time
  2. only one request must be done per value, so if two threads need to load the same subsets, one must wait for the other
  3. I don't want to have a lock (or mutex, or another primitive) on each value since they can be potentially too high.

The scenario could be (suppose thread B enters a little bit earlier)

          thread A           thread B
values    5, 8, 9, 12        7, 8, 9, 13, 14
request   5,       12        7, 8, 9, 13, 14
waits for    8, 9
                             >>data loaded<<
retrieves    8, 9

          >> data loaded <<
returns   5, 8, 9, 12

Which concurrent mechanism should I use for this?

Remember a producer/consumer won't work since thread A and B are not exactly consumers (they are only interested on certain data).

Thanks

+1  A: 

This sounds much like a lock manager, so why not build one?

class LockManager<TKey>
{
    private Dictionary<TKey, List<EventWaitHandle>> locks =
        new Dictionary<TKey, List<EventWaitHandle>>();
    private Object syncRoot = new Object();

    public void Lock(TKey key)
    {
        do
        {
            Monitor.Enter(syncRoot);
            List<EventWaitHandle> waiters = null;
            if (true == locks.TryGetValue(key, out waiters))
            {
                // Key is locked, add ourself to waiting list
                // Not that this code is not safe under OOM conditions
                AutoResetEvent eventLockFree = new AutoResetEvent(false);
                waiters.Add(eventLockFree);
                Monitor.Exit(syncRoot);
                // Now wait for a notification
                eventLockFree.WaitOne();
            }
            else
            {
                // event is free
                waiters = new List<EventWaitHandle>();
                locks.Add(key, waiters);
                Monitor.Exit(syncRoot);
                // we're done
                break;
            }
        } while (true);

    }

    public void Release(TKey key)
    {
        List<EventWaitHandle> waiters = null;
        lock (syncRoot)
        {
            if (false == locks.TryGetValue(key, out waiters))
            {
                Debug.Assert(false, "Releasing a bad lock!");
            }
            locks.Remove(key);
        }
        // Notify ALL waiters. Unfair notifications
        // are better than FIFO for reasons of lock convoys
        foreach (EventWaitHandle waitHandle in waiters)
        {
            waitHandle.Set();
        }
    }
}

You must lock each value before you use it:

class Program
{
    class ThreadData
    {
        public LockManager<int> LockManager { get; set; }
        public int[] Work { get; set; }
        public AutoResetEvent Done { get; set; }
    }

    static void Main(string[] args)
    {
        int[] forA = new int[] {5, 8, 9, 12};
        int[] forB = new int[] {7, 8, 9, 13, 14 };

        LockManager<int> lockManager = new LockManager<int>();

        ThreadData tdA = new ThreadData
        {
            LockManager = lockManager,
            Work = forA,
            Done = new AutoResetEvent(false),
        };
        ThreadData tdB = new ThreadData
        {
            LockManager = lockManager,
            Work = forB,
            Done = new AutoResetEvent(false),
        };

        ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdA);
        ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdB);

        WaitHandle.WaitAll(new WaitHandle[] { tdA.Done, tdB.Done });
    }

    static void Worker(object args)
    {
        Debug.Assert(args is ThreadData);
        ThreadData data = (ThreadData) args;
        try
        {
            foreach (int key in data.Work)
            {
                data.LockManager.Lock(key);
                Console.WriteLine("~{0}: {1}",
                    Thread.CurrentThread.ManagedThreadId, key);
                // simulate the load the set for Key
                Thread.Sleep(1000);
            }
            foreach (int key in data.Work)
            {
                // Now free they locked keys
                data.LockManager.Release(key);
            }
        }
        catch (Exception e)
        {
            Debug.Write(e);
        }
        finally
        {
            data.Done.Set();
        }
    }
}

The biggest problem you'll face will be deadlocks. Change the two arrays to {5,8,9,7} and {7,8,9,5} and you'll see my point immedteatly.

Remus Rusanu
Unless I'm missing the point you're creating a list of locks, one per item, and that's exactly what I want to avoid since it will be an overkill for the system, right?
pablo
One lock per *waiter* for a *locked* item. A waiter (=a thread) can only wait for one item at a time, so the worst case you have as many events as threads (hoipefully less, exact match means you're deadlocked, but deadlocks ar inherent in your requirements and can onyl be avoided by properly distributing the sets).
Remus Rusanu
Well, in order to avoid deadlocks the policy will be: you go and deliver the ones that are not on the wait list and then wait for the ones you're missing, this way deadlocks will be avoided, won't they?
pablo
Another option I'm thinking about is: you just use a Monitor, you enter the exclusion, check what's on the "pending list" and remove from your request whatever is already there, you continue, get the remaining elements and wait on a monitor. Each time someone delivers something awakes all the threads waiting, they check whether there's stuff for them and otherwise (or until their list is complete) they sleep again. It's not so efficient since threads can get awakened just to do nothing but it only uses 1 monitor for everything. What do you think?
pablo
That is similar, the difference being that there is one global list of waiters vs. one list per 'lock'. My solution is inspired from the typical transaction processing lock manager implementation and should perform acceptable under frequent acquire-release calls. In your case you have a similar list of 'locks' (can be a HashSet instead of a Dictionary), you have one single sync primitive vs. one per waiting thread, and you don't need a list of waiter (global nor per 'lock'). So you have fewer 'moving pieces', at the cost of more frequent wakes w/o successful acquire.
Remus Rusanu
I would go with whatever you think is simpler and easier to understand, since you really only require correctness, not performance and scale (ie. the locking/exclusion just has to work correctly for the few times it will be invoked during the process lifetime, is not going to be a core piece of functionality called thousands of times per sec where performance would be critical).
Remus Rusanu
I would also revisit the requirement to 'complete' the work in a batch (ie. thread A resumes after B completes the entire load, not as soon as B loads '8'), since this requirement introduces a fairly high deadlock probability.
Remus Rusanu