views:

46

answers:

1

i'm using the BlockingCollection for a Producer Consumer pattern and i got an excecption i think to write a patent on it- only two results in google! the expection is "CompleteAdding may not be used concurrently with additions to the collection" and it happens when i TryAdd on th BlockingCollection as Follows:

 public void EnqueueTask(T item)
    {
        if (!_cancellationTokenSource.IsCancellationRequested)
        {
            _workerQueue.Add(item);
        }
    }

the CompleteAdding is called on the dispose of the Consumer-Producer wrapper class:

  public void Dispose()
    {
        if (!_IsActive)
            return;
        _IsActive = false;
        _cancellationTokenSource.Cancel();
        _workerQueue.CompleteAdding();
        // Wait for the consumer's thread to finish.
        for (int i = 0; i < _workers.Length; ++i)
        {
            Task t1 = Task.Factory.StartNew(() =>
            {
                try
                {
                    if (!_workers[i].Join(4000))
                        LogWriter.Trace("Failed to join thread", "ThreadFailureOnDispose");
                }
                catch (Exception ex)
                {
                    OnLogged(ex.Message + ex.StackTrace);
                }
            });

        }


        // Release any OS resources.
    }

Anyone from microsoft got an idea? should i sleep after the cancelation and before calling the CompleteAdding?

+5  A: 

Look at this piece of the code:

    for (int i = 0; i < _workers.Length; ++i)
    {
        Task t1 = Task.Factory.StartNew(() =>
        {
            try
            {
                if (!_workers[i].Join(4000))   << == Here
                    LogWriter.Trace("Failed to join thread", "ThreadFailureOnDispose");
            }

In _workers[i].Join(4000), the value of i is not what you think it is. Try again with:

   for (int i = 0; i < _workers.Length; ++i)
    {
        int j = i;  // copy
        Task t1 = Task.Factory.StartNew(() =>
        {
            try
            {
                if (!_workers[j].Join(4000))  // j
                    LogWriter.Trace("Failed to join thread", "ThreadFailureOnDispose");
            }

In your version, the variable 'i' is captured and all Tasks use the same var. All but the first few will see i == _workers.Length because they are executed after the for-loop is completed.

It is a classic lambda + captured var problem.

Henk Holterman
Ah, the classic captured iteration variable. Good spot.
Marc Gravell
it's not realted to the question but if you mean the ++i instead of the i++ - it's a Typo.
@user - take a copy of i inside the foreach; refer only to that copy.
Marc Gravell
Thanks. your'e right.
@user, no it is not about i++ but the use of i inside the lambda.
Henk Holterman