views:

355

answers:

1

I hope you will bear with me. I wanted to provide as much information as I can. The main problem is how to create a structure (like a stack) that will be used by multiple threads that will pop a value and use it to process one big flat file and possibly do cycling again and again until the whole file is processed. If a file has 100.000 records that can be processed by 5 threads using 2.000 row chunks then each thread will get 10 chunks to process.

My goal is to move data in a flat file (with Header...Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) into OLTP DB that has recovery mode to Simple (possible Full) into 3 tables: 1st representing Subheader's unique key present in Subheader row, 2nd an intermediate table SubheaderGroup, representing grouping of detail rows in chunks of 2000 records (needs to have Subheader's Identity PK as its FK and 3rd representing Detail rows with FK pointing to Subheader PK.

I am doing manual transaction management since I can have tens of thousands of Detail rows and I am using a special field that is set to 0 in destination tables during the load and then at the end of file processing I am doing a transactional upate changing this value to 1 which can signal other application that the loading finished.

I want to chop this flat file into multiple equal pieces (same number of rows) that can be processed with multiple threads and imported using SqlBulkCopy using IDataReader that is created from Destination table metadata).

I want to use producer/consumer pattern (as explained in link below - pdf analysis and code sample) to use SqlBulkCopy with SqlBulkCopyOptions.TableLock option. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx This pattern enables creating multiple producers and the equivalent number of consumers need to subscribe to producers to consume the row.

In TestSqlBulkCopy project, DataProducer.cs file there is a method that simulates production of thousands of records.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

This method will be executed in the context of a new thread. I want this new thread to read only a unique chunk of original flat file and another thread will strart processing the next chunk. Consumers would then move data (that is pumped to them) to SQL Server DB using SqlBulkCopy ADO.NET class.

So the question here is about main program dictating what lineFrom to lineTo should be processed by each thread and I think that should happen during thread creation. Second solution is probably for threads to share some structure and use something unique to them (like thread number or sequence number) to lookup a shared structure (possibly a stack and pop a value (locking a stack while doing it) and then next thread will then pickup the next value. The main program will pick into the flat file and determine the size of chunks and created the stack.

So can somebody provide some code snippets, pseudo cod on how multiple threads would process one file and only get a unique portion of that file?

Thanks, Rad

+1  A: 

What's worked well for me is to use a queue to hold unprocessed work and a dictionary to keep track of work in-flight:

  1. Create a worker class that takes the filename, start line, and line count and has an update method that does the database inserts. Pass a callback method that the worker uses to signal when its done.
  2. Load a Queue with instances of the worker class, one for each chunk.
  3. Spawn a dispatcher thread that dequeues a worker instance, launches its update method, and adds the worker instance into a Dictionary, keyed by its thread's ManagedThreadId. Do this until your maximum allowable thread count is reached, as noted by the Dictionary.Count. The dispatcher waits until a thread finishes and then launches another. There's several ways for it to wait.
  4. As each thread finishes, its callback removes its ManagedThreadId from the Dictionary. If the thread quits because of an error (such as connection timeout) then the callback can reinsert the worker into the Queue. This is a good place to update your UI.
  5. Your UI can show active threads, total progress, and time per chunk. It can let the user adjust the number of active threads, pause processing, show errors, or stop early.
  6. When the Queue and Dictionary are empty, you're done.

Demo code as a console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
ebpower
epower,Thanks for detailed pseudo code. Whould you point me to some code that does something similar. Where would callback method definition and reference to Dictionary and Queue leave? I suppose Callback would be a method on the main thread. The main thread creates instances of Workers, a Queue, spawns a Dispatcher thread which I believe is given the Queue (created in a class on the main thread) as an argument in its constructor. I believe Dictionary is also created in a class on the main thread and passed to Dispatcher in its constructor as well.
Rad
I suppose there would be a need for locking Queue and Dictionary because they are being accessed by the main thread (when callback is called) and Dispatcher.I just don't know how this works: "the dispatcher waits until a thread finishes and then launches another" if the callback is on the main thread. How will the dispatcher know what is happending on the main thread's callback method. Would it pool Dictionary for an empty space and realize it can dequeue a new value from Queue.Can you please provide some implementation details.Thanks a lot,Rad
Rad
I've added a sample app to demonstrate it. You're correct in that you need to lock around the collections. The code is meant to be simple and functional, not the last word in multithreaded design. Hopefully others will chime in with some embellishments and [gentle] critiques.
ebpower
That is a great code sample. Thanks. I noticed that activeThreadCount variable is not being used. I guess because it can be changed between the time it is set and when we again need to read activeWork.Count in line: "if (activeWork.Count == maxThreads) break; ". Is "Thread.Sleep(200); " line the only unfinished line of code? I am trying to understand what to do on this line.
Rad
Do I need to widen the scope of and keep a collection of worker instances (they are scoped to inner while loop) in order to check if any worker threads is finished. Do I have to add something (e.g ManualResetEvent via ObjectState) into WorkerFinished method like it is done here:http://74.125.95.132/search?q=cache:http://bytes.com/topic/c-sharp/answers/226317-working-example-waithandle-waitany-please in order to signal the main thread to continue to Dequeue Workers and create new Threads (up to maxThreads)
Rad
@Rad - Good catch on the unused activeThreadCount - I missed that when I refactored the example. The Thread.Sleep(200) isn't unfinished, it's just one way for Dispatch to pause until some threads finish. WorkerFinished method will remove a thread from the activeWorkder collection so you don't have to check elsewhere. You can use reset events in WorkerFinished to replace Thread.Sleep; I just took the simpler route, but it will let a lot of threads finish at the same time before Dispatch launches more.
ebpower
You'll should try both approaches and see what works best. Measure things like threads per second and average thread duration for starters. You can also put LoadQueue in a separate thread (with locking) if you want to start processing workers while the queue is being filled.
ebpower
@ebpower. You've got some threading POWER. Thank you. I would increase your Vote, but I didn't build enough points.Sorry to bug you, but I never heard of WorkerFinished method. Would you mind (whenever you find the time – I am simply too new to threading that my head is spinning) if you would edit above code to add your idea along with LoadQueue being called on a separate thread including my observations about using activeThreadCount variable properly
Rad
(I don’t know how to guaranteed that it stay the same until the end of method) and widening the scope of a collection of worker instance and how to use Manual reset properly. I have a bunch of files being copied in folder and if I start loading a Queue with new Worker threads I may finish the program prematurely. My console program is run externaly and it should be active for 15 minutes and it should process as many files as they arrive.
Rad
So whatever number of files it collects within those 15 minutes it should be finished by this instance of the console application. If a big file is encountered than it can run much longer (even in hours). I want to know how to hanle situation when the external scheduling program runs my console app again how to grab only new files not being Queued by the first instance.Can I somehow run this application in a loop forever (as a service I suppose) or there would be a way that 2 instances of my console applications can communicate using intra process techniques to
Rad
figure out what the first instance will process so the second instance only gets unprocessed files (that the first one didn't pick up in 15 minutes period)I have one last question: Is there a way that once multiple threads finished processing and joined at the end by the main thread that it can still pick into each threads state and discover what they have collected (I believe this is called internal thread state)I want to use about 4-8 threads to process a big file with 100.000 rows and use SqlBulkCopy.
Rad
InputFile has different row types (1) representing File header (customerID), (3) Start of batch (Unique OrderID), (5) OrderDetails, (7) End of batch (wich contains the number of rows in the batch) and (9) is the and of File. The only link between and order and details is that detail rows are following (3) record type and they are sandwiched between it and (7) record. I want to start each thread that will read the X number of rows to process (let’s say 2000). A thread may choose to try to go over or below the limit +/- 10% to find a last batch as a whole (03) ,(5)…(5),(7)
Rad
and then use Publisher/Subscriber pattern to insert parent/detail into a staging tables (I will probably have to add a column (OrderID to details to establish relationship between parent/child). StartOfFile_1---FirstBatch---3,5,5,5,5, 3,5,5, 5,5, 3, 5,5, 5,7, 3,5,7,---SecondBatch---3,5,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5,5 5,5, 5,5,----ThirdBatch----- 5,5, 5,5, 5,5,7, 3,5,5, 5,5, 5,5, 5,5, 5,5, 5,5, 5,5,7----9_EndOfFileSo my problem here is that ThirdBatch wouldn’t know where previous started and SecondBatch can start with row type other then (3).
Rad
So threads somehow need to cooperate in order to process all the rows in a file. Thanks
Rad