views:

108

answers:

0

I am tasked with rewriting our internal conversion software from vb6 to C# .net 4. The software will take a data source (it could be a flat file, mySQL, MS SQL, Access) and put it in to our MS SQL format. So yes I as the client I need to touch all 4,000,000 rows. Right now I am writing a MS SQL -> MS SQL module. Because I need to do many row functions to get it in our format (one example is the source database will put a person's full name in one field and we use a field for first and a field for last) I thought it would be a good time to use Parallel.ForEach Also the SqlDataReader only holds the current row in memory so I do not need to worry about out of memory exceptions on the large datasets.

My pipelined system is like this

MS SQL Database --> SqlDataAdapter --> DataRow(multitheaded step) --> BlockingCollection -> SqlBulkCopy feeding off of a GetConsumingEnumerable(multitheaded step)

Here is a snippet of what I have so far, Is there anything I am doing wrong for the parallel processing?

protected BlockingCollection<DataRow> _DestRowCollection; //Has a capacty of 100,000 rows

protected override void Conversion()
{
    _CancelationToken.ThrowIfCancellationRequested(); // check to see if we are cancelled before we start.

    ParallelOptions opt = new ParallelOptions();
    opt.CancellationToken = _CancelationToken;
    // Consumer to the ForEach producer to the bulk insert.
    BulkCopyManager bcm = new BulkCopyManager(opt, _DestRowCollection, new CVT.IDAT_CLIENTSDataTable(), _CancelationToken, FrameworkGlobals.CVTConnectionString, "IDAT_CLIENTS");
    using (SqlConnection connection = new SqlConnection(FrameworkGlobals.CVTConnectionString))
    {
        connection.Open();
        using (SqlCommand cmd = new SqlCommand("truncate table IDAT_CLIENTS", connection))
        {
            cmd.ExecuteNonQuery();
        }
    }
    using (SqlConnection connection = new SqlConnection(ConversionGlobals.DataSourceConnectionString))
    {
        connection.Open();
        using (SqlCommand command = new SqlCommand("Select count(*) from client", connection))
        {
            var temp = command.ExecuteScalar();
            if(temp.GetType() == typeof(int))
                _rowCount = (long)(int)temp;
            else
                _rowCount = (long)temp;
        }
        using (SqlCommand command = new SqlCommand("Select * from client", connection))
        using (var reader = command.ExecuteReader())
        {
            Parallel.ForEach<IDataRecord, CVT.IDAT_CLIENTSDataTable>(
                reader.AsEnumerable(), //Soruce
                opt, //parralel options
                new Func<CVT.IDAT_CLIENTSDataTable>(InitlizeConvertRowThread), //thread initlizer
                new Func<IDataRecord, ParallelLoopState, CVT.IDAT_CLIENTSDataTable, CVT.IDAT_CLIENTSDataTable>(ConvertRow), //Action
                new Action<CVT.IDAT_CLIENTSDataTable>(FinilizeConvertRowThread)); //thread finalizer
            _DestRowCollection.CompleteAdding();
        }
    }
    BuildDiscounts();
    bcm.WaitForCompleation();
    bcm.Dispose();
    RaiseComplete();
}

private CVT.IDAT_CLIENTSDataTable InitlizeConvertRowThread()
{
    CVT.IDAT_CLIENTSDataTable dt = new CVT.IDAT_CLIENTSDataTable();
    dt.BeginLoadData();
    return dt;
}

protected virtual CVT.IDAT_CLIENTSDataTable ConvertRow(IDataRecord sourceRecord, ParallelLoopState loopState, CVT.IDAT_CLIENTSDataTable initlizedTable)
{
    //If the task has been canceled tell all of the threads to stop as soon as possible.
    if (_CancelationToken.IsCancellationRequested)
    {
        loopState.Stop();
        return initlizedTable;
    }
    CVT.IDAT_CLIENTSRow row = initlizedTable.NewIDAT_CLIENTSRow();

    if (sourceRecord[0] == DBNull.Value)
    {
        IncrementProgress();
        return initlizedTable;
    }

    row.CleanAndAdd(_DestRowCollection); //Does the data validation and adds it to the collection if valid.

    return initlizedTable;
}
private void FinilizeConvertRowThread(CVT.IDAT_CLIENTSDataTable dt)
{ 
    dt.Dispose();
}

One thing that i did that was "clever" (Which is almost always bad) is the BulkCopyManager will keep a ratio of producers and consumers so if _DestRowCollection is less than 1/3 full it will increase opt.MaxDegreeOfParalleism by one until it is equal the number of cores in the system - 1 (in a single core system it sets it to 1) and kills off a thread doing bulk inserts. it does the opposite if over 2/3 full.

So my question is, as any time anyone codes on something they have never done before, where did I screw up?