views:

175

answers:

4

I am in the midst of writing a C# Windows Form application that processes quotes from the market through an algorithm (strategy) to create orders to a brokerage firm. That all seems to be testing fairly well until I tried to build in the capacity to run multiple strategies simultaneously with each strategy on it's own thread. At this point everything starts running incorrectly. I believe I have some classes that are not thread safe which are driving erratic behavior. Any insight on how I can thread this in a thread safe manner is deeply appreciated!

The way the Quotes are fed into the Algorithms is as Follows: 1) Market Data Events are fired from Brokers Software to a client class in my Software called ConnectionStatus. When the market Data event is triggered, a Quote object is built from the current values of these static variables that represent Bid, ask, etc. Once the quote is built, I am endeavoring to send it into each of the Strategy Algorithms that are running. Here is the code I am using to do that:

 foreach (StrategyAssembler assembler in StrategyAssembleList.GetStrategies())
 {                  
     BackgroundWorker thread = strategyThreadPool.GetFreeThread();
     if (thread != null)
     {
        thread.DoWork += new DoWorkEventHandler(assembler.NewIncomingQuote);
        thread.RunWorkerAsync(quote);
     }   
 }

StrategyAssembler is a class that creates an instance of the Class StrategyManager which in turn creates an instance of the strategy that contains the actual algorithms. There may be 4 or 6 different instances of StrategyAssembler, each of which has been added to a Singleton instance of StrategyAssembleList which is a BindingList.

The incoming quote object is passed into the the NewIncomingQuote method of the StrategyAssembler Class. That code is as follows:

public void NewIncomingQuote(object sender, DoWorkEventArgs e)
    {
        Quote QUOTE = e.Argument as Quote;            

        lock (QuoteLocker)
        {
            manager.LiveQuote(QUOTE);

            priorQuote = QUOTE;
        }
    }

I was thinking that by using a lock before passing the quote into the manager.LiveQuote(Quote quote) method that all the objects that use the quote "downstream" of this point would be able to consume the quote in a thread safe fashion, but testing is showing otherwise. Is there a way I could put the each instance Of StrategyAssembler on its own thread that ensures that all the objects created by Strategy Assembler are thread safe and then feed the quote into StrategyAssembler? Is this path of thinking an appropriate way to deal with this situation?

Thanks in advance for any feedback or help,

Learning1

+1  A: 

The locking should occur at both the reading and writing to any shared state. Without locks on the read, the code can still read and write concurrently.

You could wrap the read and write locking into the manager.

spender
Thanks for the feedback spender. Are you saying that at every "downstream" point in the code that is reading the quote object I need a lock? (There are a lot of places and objects downstream that uses values from each quote).
Learning1
Wrapping locks into the manager is a dangerous way. Example. If you have a (locked) read, and a (locked) write, this feels safe. But if you do a locked read(), increment, locked write() you'll still have a violation of your critical section. The lock should be around the total usage of the data, resulting in an atomic modification of your data.
Adriaan
@spender In this case it's better to copy the quote and avoid locking period... as a matter of fact proper locking would make his strategies run sequentially, it will render the multithreading effort completely useless AND more computationally expensive because of the overhead of locking + context switching.
Lirik
A: 

If:

1) Strategies are invoked via the LiveQuote method and can modify Quote instances.

2) Changes to Quote instances should not be shared among strategies.

You need to create a copy of the provided Quote before calling LiveQuote() and send the copy in to the strategy method rather than the original quote. Depending on other requirements, you may not need any locking at all.

Jeff Sternal
A: 

There are two things that are happening in your code:
1. You received a quote from one thread (the producer AKA the market data feed).
2. You send the quote to another thread (the consumer AKA StrategyAssembler).

At this point there is a contention on the quote, in other words the producer thread and each consumer thread (i.e. each instance of a strategy) can modify the quote which you just provide it with. In order for you to remove the contention you must do one of three things:

  1. Synchronize between all of the threads with access to the quote.
    OR
  2. Make the quote immutable (and ensure that the producer does not replace it).
    OR
  3. Give each consumer a copy of the quote.

For your case I would suggest you take the third option because locking is more expensive than copying a quote (I hope your quotes are not really big)... option two is also good, but your strategy should not modify the quote.

By giving each consumer a copy of the quote you're ensuring that they don't share any data, therefore no other thread will modify the quote and you will eliminate contention. If your strategies are not creating any other threads, then you you're done.

In general you should avoid locking and you should try to minimize data sharing, but if you HAVE TO share data between threads then you should do it properly:
In order for your strategies to synchronize correctly they must synchronize on the same QuoteLocker object, i.e. QuoteLocker must be visible to each thread. Even if you do it properly and you make your strategies synchronize (lock on the QuoteLocker) then you might as well not have threads... you will be running the overhead of context switching + locking and your strategies will be executed sequentially for the same quote.

Update per comments: If you leave the code as is (meaning that you provide a copy of the quote for each thread), then I don't see why your other strategies will not get the quote until the first strategy completes... your first strategy will most likely begin working while the threads for the other strategies are being created. The whole point of making your strategies run in a separate thread is to avoid precisely that problem... you start a new thread so your other strategies are not waiting on each-other to complete.

This part of the code will most likely complete even before all your threads start working...

foreach (StrategyAssembler assembler in StrategyAssembleList.GetStrategies())
 {                  
     BackgroundWorker thread = strategyThreadPool.GetFreeThread();
     if (thread != null)
     {
        thread.DoWork += new DoWorkEventHandler(assembler.NewIncomingQuote);
        Quote copy = CopyTheQuote(quote);// make an exact copy of the quote
        thread.RunWorkerAsync(copy);
     }   
 }

Is your market feed changing the actual quote while you're creating the threads? The market feeds generally provide snapshots, so unless something is changing your quote while you're making the threads, then the design above should be just fine. If there is a problem with the design, then I can give you a producer and multiple consumer design based on a blocking queue which is also very efficient (you can check out this discussion for an idea on how it works and I can tell you how to modify it for your specific example).

Lirik
Yes, preventing is better than fixing. If your threads can run independently (option 3) you'll have the safest and best performing option. Since you can not guarantee that the first quote is completely handled first, you may need to add a (high resolution) timestamp in your copy.
Adriaan
Thanks for the Feedback Lirik. That all makes a bunch of sense. Option makes the most sense to me for the reasons you mentioned and others also. In terms of making a copy and passing this to each instance of assembler, I think this would be best done with an event so all assemblers are getting their copy of the quote simulataneously and can then process their logic. Otherwise if it is left in the loop, the following strategies won't get the quote until the 1st one completes its logic (about 100ms and quotes can arrive in as little as 50ms).
Learning1
If I put the quote in an event that each assembler subcribes to, then what is the best way of getting each assembler running on a thread so I can take advantage of more cores on the CPU? At the moment, each of these assemblers and the Strategy Algorithm that each has created all run on the Main thread
Learning1
I'll update my answer based on your comments.
Lirik
Thanks for the insight Lirik, I'll try this version and see how it workd with the copy of the Quote. In terms of the market changing, the code snippet above is inside the BuildQuote() method. This method is called each time there is a change to BidPrice, BidVolume, AskPrice or AskVolume. Each time it is called, a new Quote is created using static variables that were changed by the Broker Data event. I don't know if a producer and multiple consumer design would be a better way to do this. I'll take a look at the link you posted - thanks!
Learning1
You create a new Quote, but are you replacing the one that's currently being used to initialize the threads?
Lirik
A: 

Thanks to everyone who has contributed to help me solve this. I appreciate the feedback and expertise you have shared. It seems like this is now working close to the way I expect, however I am seeing some behavior I don't understand.

First, here is what I have done: In the ConnectionStatus Class that is building the new quotes (from the raw price and volume data events from the broker) I am building a quote in this fashion and sending it on a thread to each instance of StrategyAssembler ... here is the code:

public void BuildQuote()
    {
        if (Bid != 0 && BidVolume != 0 && Ask != 0 && _AskVolume != 0)
        {
            try
                {
                    float BidPrice = (float)Math.Round(Bid, 5, MidpointRounding.ToEven);
                    float AskPrice = (float)Math.Round(Ask, 5, MidpointRounding.ToEven);
                    Quote quote = new Quote(DateTime.Now, BidPrice , BidVolume, AskPrice , AskVolume);

                    foreach (StrategyAssembler assembler in StrategyAssembleList.GetStrategies())
                    {                                                      
                        BackgroundWorker thread = strategyThreadPool.GetFreeThread();
                        if (thread != null)
                        {
                            thread.DoWork += new DoWorkEventHandler(assembler.NewIncomingQuote);
                            Quote copy = new Quote(quote);
                            thread.RunWorkerAsync(copy);
                        }                            
                }
             catch(Exception err)
                {
                    Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") + " Firing Quote from ConnectionStatus.BuildQuote failed: " + Environment.NewLine + err.Message);
                }
        }
    }

Bid, Ask, BidVolume and AskVolume are all static variables that are set when the MarketVolume and MarketPrice events are fired by the Broker's software.
In Order to Copy the quote, I put a new constructor in the Quote Class. Here is the code for that:

public Quote(Quote _quote)
    {
        DateTime = _quote.DateTime;
        Bid = _quote.Bid;
        BidSize = _quote.BidSize;
        Ask = _quote.Ask;
        AskSize = _quote.AskSize;
        Middle = (((double)Ask + (double)Bid) / 2.0);
    }

In each StrategyAssembler instance that is receiving the copy of the quote from the ConnectionStatus class, I have had to lock the quote in order to get a consistent behavior through the rest of the algorithm / classes that simulate market behavior (Order fills, cancellations, etc)

public void NewIncomingQuote(object sender, DoWorkEventArgs e)
    {
        lock (QuoteLocker)
        {
            Quote QUOTE = e.Argument as Quote;                
            if (priorQuote != null)
            {
                if (priorQuote.DateTime > QUOTE.DateTime)
                    Console.WriteLine("QuoteTime out of Sync, " + manager.Strategy.StrategyName.ToString() + " ,Quote " + QUOTE.DateTime.ToString("HH:mm:ss.fffff") + " ,PriorQuote " + priorQuote.DateTime.ToString("HH:mm:ss.fffff"));
            }
            manager.LiveQuote(QUOTE);
            priorQuote = QUOTE;
        }
    }

QuoteLocker is a static object: static object QuoteLocker = new object();

priorQuote is a private instance of Quote (not static). Quote priorQuote;

So those are the suggestions I have implemented and it seems to work fairly well (At least that is what the testing thus far suggests).

However, there is one puzzling behavior that I am now seeing that I do not understand. It seems that any time a new quote is fired into the strategy from the thread in the ConnectionStatus Class before the prior quote had completed its processing, I get the Console writeline message that QuoteTime is out of sync (e.g. another quote fires in 2 - 10 ms). I do not understand how the console message could be triggered if the QuoteLocker has not yet completed. Would changing priorQuote to a static Quote shift this behavior or is something likely happening back in ConnectionStatus.BuildQuote() that creates quotes non-chonilogically. This seems unlikely as the Quote is newly created using DateTime.Now as the timestamp and immediately fired on the threads. Does anyone have any insight?

Thanks in advance,

Learning1

Learning1
You might consider asking another question to debug this new problem, but I think I see what's going on. When you start multiple threads, you don't know the order in which they'll execute. The way your program is designed right now, it will launch several threads for the same strategy at nearly the same time, but you haven't provided a way to guarantee that it processes quote 1 before quote 2.
Jeff Sternal
If you only want strategies to process the most recent quote, I recommend looking at this SO question: http://stackoverflow.com/questions/2364260/how-do-i-handle-messages-asynchronously-throwing-away-any-new-messages-while-pro/2364367#2364367. If you want strategies to process every quote that comes in, take a look at this article for ideas: http://msdn.microsoft.com/en-us/magazine/dd419664.aspx (it discusses the ThreadPool, but the principles are the same). One other thing - if QuoteLocker is static, your strategies cannot run in parallel.
Jeff Sternal
Thanks for the Feedback Jeff. Accordingly I have changed the static object QuoteLocker to simply be private. In terms of the behavior I need, it is the 2nd behavior. Each strategy needs to process every quote. I'll take a look at the article you posted. I am still getting the puzzling behavior I mentioned above, even after changing the QuoteLocker to private instead of static. Any ideas why this might be?
Learning1
Threads don't necessarily execute in the order you start them. I'm pretty sure the new behavior you're seeing is caused by the fact that you're starting multiple Strategy1 threads (for example) for several quotes, and the thread processing quote 2 executes before the thread processing quote 1.
Jeff Sternal
Thanks Jeff, That sounds likely. Given the code above, what would be a good way to to ensure that a given instance of StrategyAssembler cannot process the Quote passed in by a second thread until the prior one has completed?
Learning1
The link you suggested (msdn.microsoft.com/en-us/magazine/dd419664.aspx) seems to provide a high level architecture for the threadpool that would ensure this. I may take a stab at implementing something along those lines if the testing confirms mis-sequencing through the rest of the API. Thanks for suggesting the article
Learning1