views:

96

answers:

4

I use producer/consumer pattern with FileHelpers library to import data from one file (which can be huge) using multiple threads. Each thread is supposed to import a chunk of that file and I would like to use LineNumber property of the FileHelperAsyncEngine instance that is reading the file as primary key for imported rows. FileHelperAsyncEngine internally has an IEnumerator IEnumerable.GetEnumerator(); which is iterated over using engine.ReadNext() method. That internally sets LineNumber property (which seems is not thread safe).

Consumers will have Producers assiciated with them that will supply DataTables to Consumers which will consume them via SqlBulkLoad class which will use IDataReader implementation which will iterate over a collection of DataTables which are internal to a Consumer instance. Each instance of will have one SqlBulkCopy instance associate with it.

I have thread locking issue. Below is how I create multiple Producer threads. I start each thread afterwords. Produce method on a producer instance will be called determining which chunk of input file will be processed. It seems that engine.LineNumber is not thread safe and I doesn't import a proper LineNumber in the database. It seems that by the time engine.LineNumber is read some other thread called engine.ReadNext() and changed engine.LineNumber property. I don't want to lock the loop that is supposed to process a chunk of input file because I loose parallelism. How to reorganize the code to solve this threading issue?

Thanks Rad

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }
+1  A: 

Dictionary<> is not thread safe. Is the dictionary in the above code being properly locked or is it only used in your lock(this)?

As an aside I would avoid the lock(this) paradigm and use generic objects to lock your code. You may be running into other locking issues not related to specific resources. I detail that issue on my blog (Smart Resource Locking in C# .Net for Thread Safe Code). HTH

OmegaMan
I also tried declaring a class variable: object _lock = new object() and used lock (_lock) with no success. I tried locking only these with no success. When you say that Dictionary<> is not thread safe will lock make it thread safe? Obviously I don't understand threading completely. It seems that records are inserted correctly (In input file I changed values of a field to reflect the line number they are in; detail rows start from line 1) and I checked DB.
Rad
However I like to inject the current line number that the Engine provides into a field I append at the end of the each row. My DB will need to sort the table in LineNumber orderfor later processing. dict[lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));This is the sequence of rows into DB (1-5, 31-45 and 16-30) with 3 threads running.1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30
Rad
This is what I get for appended LineNumber field (it doesn’t exist input file)23,24, 25,26, 27,28,29,30,9,37,38,39,40,41,42,17,18,19,20,21,9,11,12,13,14,15,42,43,44,45,31,32,33,34,35,36,1,2,3,4,5,6,7,8,9Basically code line: buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));uses a DataTable as a buffer and maps LastRecord object, which is the engine’s Type to a DataRow which has an additionally LineNumber field I am interesting in obtaining in DB.
Rad
A: 

I think I corrected the issue. It was Dictionary<> that needed lock

lock (dict) { dict[lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer)); } Thank you OmegaMan for a good clue.

Rad
A: 

Hi there Rad

You are right the LineNumber is not thread safe :(

I just investigation the code and found that we read the line from our internal reader and later update the LineNumber so not thread safety at all.

The problem is that if we add some sincronization code inside we can make things really slower, maybe we need to create a thread safe version of the internal code to avoid that overhead.

Anyway I think that from the performance perspective the slower part of the code are the file operations so you get no speed up using multiple threads to read. Maybe you need only one thread to read the file to a working queue and has multiple threads that read it and work with each record, in that case you get the thread safety that you need

Cheers

MarcosMeli
A: 

Hi Rad - Could you please post the complete source code of the Sample when you get a chance? Especially, I could not understand the purpose of 'LineNumberFieldName' argument that is passed to the Produce class constructor..

Also it would be helpful if you could explain about the method 'ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));'.

I kind of implemented the sample on my own and it works, but it seems to take longer time to execute the sample. Do you really see a benefit with this approach ?

Appreciate your response..

Thanks Chandra

Chandra