views:

47

answers:

2

I am starting to write my first parallel applications. This partitioner will enumerate over a IDataReader pulling chunkSize records at a time from the data-source.

TLDR; version

private object _Lock = new object();
public IEnumerator GetEnumerator()
{
    var infoSource = myInforSource.GetEnumerator();
                   //Will this cause a deadlock if two threads 
    lock (_Lock)   //use the enumator at the same time?
    {
        while (infoSource.MoveNext())
        {
            yield return infoSource.Current;
        }
    }
}

full code

protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]>
{
    private readonly System.Data.IDataReader _Input;
    private readonly int _ChunkSize;
    public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000)
        : base()
    {
        if (chunkSize < 1)
            throw new ArgumentOutOfRangeException("chunkSize");
        _Input = input;
        _ChunkSize = chunkSize;
    }

    public override bool SupportsDynamicPartitions { get { return true; } }

    public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount)
    {

        var dynamicPartitions = GetDynamicPartitions();
        var partitions =
            new IEnumerator<object[]>[partitionCount];

        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = dynamicPartitions.GetEnumerator();
        }
        return partitions;


    }

    public override IEnumerable<object[]> GetDynamicPartitions()
    {
        return new ListDynamicPartitions(_Input, _ChunkSize);
    }
    private class ListDynamicPartitions : IEnumerable<object[]>
    {
        private System.Data.IDataReader _Input;
        int _ChunkSize;
        private object _ChunkLock = new object();
        public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize)
        {
            _Input = input;
            _ChunkSize = chunkSize;
        }

        public IEnumerator<object[]> GetEnumerator()
        {

            while (true)
            {
                List<object[]> chunk = new List<object[]>(_ChunkSize);
                lock(_Input)
                {
                    for (int i = 0; i < _ChunkSize; ++i)
                    {
                        if (!_Input.Read())
                            break;
                        var values = new object[_Input.FieldCount];
                        _Input.GetValues(values);
                        chunk.Add(values);
                    }
                    if (chunk.Count == 0)
                        yield break;
                }
                var chunkEnumerator = chunk.GetEnumerator();
                lock(_ChunkLock) //Will this cause a deadlock?
                {
                    while (chunkEnumerator.MoveNext())
                    {
                        yield return chunkEnumerator.Current;
                    }
                }
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<object[]>)this).GetEnumerator();
        }
    }
}

I wanted IEnumerable object it passed back to be thread safe (the MSDN example was so I am assuming PLINQ and TPL could need it) will the lock on _ChunkLock near the bottom help provide thread safety or will it cause a deadlock? From the documentation I could not tell if the lock would be released on the yeld return.

Also if there is built in functionality to .net that will do what I am trying to do I would much rather use that. And if you find any other problems with the code I would appreciate it.

+1  A: 

I wrote a test framework, it does not deadlock but the second thread will never get data.

static void Main()
{
    En en = new En();
    Task.Factory.StartNew(() =>
        {
            foreach (int i in en)
            {
                Thread.Sleep(100);
                Console.WriteLine("A:" + i.ToString());
            }
        });
    Task.Factory.StartNew(() =>
    {
        foreach (int i in en)
        {
            Thread.Sleep(10);
            Console.WriteLine("B:" +i.ToString());
        }
    });
    Console.ReadLine();
}

public class En : IEnumerable
{
    object _lock = new object();
    static int i = 0;
    public IEnumerator GetEnumerator()
    {
        lock (_lock)
        {
            while (true)
            {
                if (i < 10)
                    yield return i++;
                else
                    yield break;
            }
        }
    }
}

Returns

A:0
A:1
A:2
A:3
A:4
A:5
A:6
A:7
A:8
A:9

Here is a updated version of GetEnumerator that should behave correctly.

public IEnumerator<object[]> GetEnumerator()
{

    while (true)
    {
        List<object[]> chunk = new List<object[]>(_ChunkSize);
        _ChunkPos = 0;
        lock(_Input)
        {
            for (int i = 0; i < _ChunkSize; ++i)
            {
                if (!_Input.Read())
                    break;
                var values = new object[_Input.FieldCount];
                _Input.GetValues(values);
                chunk.Add(values);
            }
            if (chunk.Count == 0)
                yield break;
        }
        var chunkEnumerator = chunk.GetEnumerator();
        while (true)
        {
            object[] retVal;
            lock (_ChunkLock)
            {
                if (chunkEnumerator.MoveNext())
                {
                    retVal = chunkEnumerator.Current;
                }
                else 
                    break; //break out of chunk while loop.
            }
            yield return retVal;
        }
    }
}
Scott Chamberlain
+1  A: 

In a word: maybe*.

If you're always using this code in the context of a foreach loop, then you're not likely to hit deadlock (unless it's possible that your myInfoSource is infinite, or that your foreach loop has some code in it that will never terminate), though you might see slowdowns.

A more likely cause of potential (actually, guaranteed) deadlock would be this:

var myObject = new YourObject();
var enumerator = myObject.GetEnumerator();

// if you do this, and then forget about it...
enumerator.MoveNext();

// ...your lock will never be released

*I am basing this answer on your initial block of code.

Dan Tao