I am tasked with rewriting our internal conversion software from vb6 to C# .net 4. The software will take a data source (it could be a flat file, mySQL, MS SQL, Access) and put it in to our MS SQL format. So yes I as the client I need to touch all 4,000,000 rows. Right now I am writing a MS SQL -> MS SQL module. Because I need to do many row functions to get it in our format (one example is the source database will put a person's full name in one field and we use a field for first and a field for last) I thought it would be a good time to use Parallel.ForEach
Also the SqlDataReader
only holds the current row in memory so I do not need to worry about out of memory exceptions on the large datasets.
My pipelined system is like this
MS SQL Database --> SqlDataAdapter --> DataRow(multitheaded step) --> BlockingCollection -> SqlBulkCopy feeding off of a GetConsumingEnumerable(multitheaded step)
Here is a snippet of what I have so far, Is there anything I am doing wrong for the parallel processing?
protected BlockingCollection<DataRow> _DestRowCollection; //Has a capacty of 100,000 rows
protected override void Conversion()
{
_CancelationToken.ThrowIfCancellationRequested(); // check to see if we are cancelled before we start.
ParallelOptions opt = new ParallelOptions();
opt.CancellationToken = _CancelationToken;
// Consumer to the ForEach producer to the bulk insert.
BulkCopyManager bcm = new BulkCopyManager(opt, _DestRowCollection, new CVT.IDAT_CLIENTSDataTable(), _CancelationToken, FrameworkGlobals.CVTConnectionString, "IDAT_CLIENTS");
using (SqlConnection connection = new SqlConnection(FrameworkGlobals.CVTConnectionString))
{
connection.Open();
using (SqlCommand cmd = new SqlCommand("truncate table IDAT_CLIENTS", connection))
{
cmd.ExecuteNonQuery();
}
}
using (SqlConnection connection = new SqlConnection(ConversionGlobals.DataSourceConnectionString))
{
connection.Open();
using (SqlCommand command = new SqlCommand("Select count(*) from client", connection))
{
var temp = command.ExecuteScalar();
if(temp.GetType() == typeof(int))
_rowCount = (long)(int)temp;
else
_rowCount = (long)temp;
}
using (SqlCommand command = new SqlCommand("Select * from client", connection))
using (var reader = command.ExecuteReader())
{
Parallel.ForEach<IDataRecord, CVT.IDAT_CLIENTSDataTable>(
reader.AsEnumerable(), //Soruce
opt, //parralel options
new Func<CVT.IDAT_CLIENTSDataTable>(InitlizeConvertRowThread), //thread initlizer
new Func<IDataRecord, ParallelLoopState, CVT.IDAT_CLIENTSDataTable, CVT.IDAT_CLIENTSDataTable>(ConvertRow), //Action
new Action<CVT.IDAT_CLIENTSDataTable>(FinilizeConvertRowThread)); //thread finalizer
_DestRowCollection.CompleteAdding();
}
}
BuildDiscounts();
bcm.WaitForCompleation();
bcm.Dispose();
RaiseComplete();
}
private CVT.IDAT_CLIENTSDataTable InitlizeConvertRowThread()
{
CVT.IDAT_CLIENTSDataTable dt = new CVT.IDAT_CLIENTSDataTable();
dt.BeginLoadData();
return dt;
}
protected virtual CVT.IDAT_CLIENTSDataTable ConvertRow(IDataRecord sourceRecord, ParallelLoopState loopState, CVT.IDAT_CLIENTSDataTable initlizedTable)
{
//If the task has been canceled tell all of the threads to stop as soon as possible.
if (_CancelationToken.IsCancellationRequested)
{
loopState.Stop();
return initlizedTable;
}
CVT.IDAT_CLIENTSRow row = initlizedTable.NewIDAT_CLIENTSRow();
if (sourceRecord[0] == DBNull.Value)
{
IncrementProgress();
return initlizedTable;
}
row.CleanAndAdd(_DestRowCollection); //Does the data validation and adds it to the collection if valid.
return initlizedTable;
}
private void FinilizeConvertRowThread(CVT.IDAT_CLIENTSDataTable dt)
{
dt.Dispose();
}
One thing that i did that was "clever" (Which is almost always bad) is the BulkCopyManager will keep a ratio of producers and consumers so if _DestRowCollection
is less than 1/3 full it will increase opt.MaxDegreeOfParalleism
by one until it is equal the number of cores in the system - 1 (in a single core system it sets it to 1) and kills off a thread doing bulk inserts. it does the opposite if over 2/3 full.
So my question is, as any time anyone codes on something they have never done before, where did I screw up?