views:

48

answers:

2

I've followed this tutorial, to create a priority queue and wrapped it with a blocking collection. I've got a DataGrid which I've wired to the underlying priority queue which emits change events. I can add items to the collection from the UI thread w/out a hitch, and it blocks when the buffer is full as it's supposed to.

Now how do I consume the items? Here's what I've got:

public DownloadViewModel()
{
    Queue = new ConcurrentPriorityQueue<DownloadItem>(10);
    Buffer = new BlockingCollection<KeyValuePair<int, DownloadItem>>(Queue, 10000);

    Task.Factory.StartNew(() =>
    {
        KeyValuePair<int, DownloadItem> item;
        while(!Buffer.IsCompleted)
        {
            if(Buffer.TryTake(out item))
            {
                // do something with the item
            }

            Thread.SpinWait(100000);
        }
    });
}

But as soon as I added that Task.Factory.StartNew bit, my app suddenly takes 30 seconds before the window appears (before it was instant), and when I do add an item I get the exception

This type of CollectionView does not support changes to its SourceCollection from a thread different from the Dispatcher thread.

Which I understand, but is it really necessary to take the items using the UI thread? Doesn't that defeat the whole purpose of using this BlockingCollection? I want to create 4 or 8 consumers and have them run in parallel.

How is this supposed to be done?

+1  A: 

Wrapping the CollectionChanged event w/ a dispatcher seems work pretty well...

public bool TryAdd(KeyValuePair<int, T> item)
{
    int pos = _queues.Take(item.Key + 1).Sum(q => q.Count);
    _queues[item.Key].Enqueue(item.Value);
    Interlocked.Increment(ref _count);
    Dispatcher.BeginInvoke(
        new Action(
            () =>
            NotifyCollectionChanged(
                new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item, pos))
        ));
    return true;
}

Just had to derive my ConcurrentPriorityQueue from DispatcherObject. I think this is how it's supposed to be done.


Easier yet, just write the NotifyCollectionChanged method like this:

private void NotifyCollectionChanged(NotifyCollectionChangedEventArgs e)
{
    lock (CollectionChanged)
    {
        if (CollectionChanged != null)
            Dispatcher.BeginInvoke(new Action(() => CollectionChanged(this, e)));
    }
}

And then you don't have to litter your other methods with BeginInvoke.

Mark
A: 

[After commenting on the question, then]

You don't need to "take the items using the UI thread". However, any updates to the UI as a result of processing the item in the consuming task need to be dispatched to the UI thread. Separate your concerns!

Giraffe