views:

316

answers:

5

Here's some code that someone wants to use in a production app (not me, honest) - I'd like some independent feedback on it please.

 // In this example, the JobProcessor has only one thread in it.
 // It could be extensed to have multiple processors where each one has an
 // observer to work.
 public class JobProcessor<TJob> : IJobProcessor<TJob> where TJob : class
 {
  /// <summary>
  /// the buffer of messages waiting to be processed
  /// </summary>
  private readonly IJobQueue<TJob> theJobQueue =
   new NullQueue<TJob>();

  // private readonly ILogger thelogger;

  /// <summary>
  /// the thread that does the processing
  /// </summary>
  private Thread processorThread;

  /// <summary>
  /// a shutdown has been requested
  /// </summary>
  private bool shutdownRequested;

  private readonly IJobObserver<TJob> theObserver = new NullObserver<TJob>();

  /// <summary>
  /// this is set when the rxprocessor thread is set
  /// </summary>
  public AutoResetEvent threadStartedEvent = new AutoResetEvent(false);

  /// <summary>
  /// the thread id of the rx processor 
  /// </summary>
  private int processorThreadId = 0;

  /// <summary>
  /// the current job
  /// </summary>
  private volatile TJob currentJob = null;

  /// <summary>
  /// Create a new JobProcessor object associated with the specified theJobQueue
  /// </summary>
  public JobProcessor(IJobQueue<TJob> jobQueue, IJobObserver<TJob> observer)
      //ILogger logger)
  {
   if (observer != null)
   {
    theObserver = observer;
   }
   shutdownRequested = false;
   theJobQueue = jobQueue;
   //thelogger = logger;
   CreateAndRunThread();
  }

  private void CreateAndRunThread()
  {
   processorThread = new Thread(Run)
                      {
                       Name = "Tpk Processor Thread", IsBackground = true
                      };
   processorThread.SetApartmentState(ApartmentState.STA);
   processorThread.Priority = ThreadPriority.BelowNormal;
   processorThread.Start();
  }

  /// <summary>
  /// Request shutdown of this processor.
  /// Once shutdown has been requested, the processor continues to run
  /// until its associated theJobQueue is empty. It then stops.
  /// </summary>
  public void Shutdown()
  {
   //make sure the thread is started before we can request
   //a shutdown
   threadStartedEvent.WaitOne();

   shutdownRequested = true;

   //make sure that if the theJobQueue is blocking the JobProcessor
   //thread, that the JobProcessor thread is interupted
   theJobQueue.Interrupt(processorThreadId);
  }

  public TJob CurrentJob()
  {
   return currentJob;
  }

  /// <summary>
  /// This method forms the JobProcessor's thread of execution.
  /// This method repeatedly retrieves the next available message
  /// from the associated theJobQueue, and publishes it to subscribers
  /// of the OnMessage event. It only stops when it has been asked to
  /// shutdown (via a call to its Shutdown operation) and the theJobQueue
  /// is empty.
  /// Immediately prior to quitting, JobProcessor publishes a "null" message
  /// to notify subscribers.
  /// </summary>
  private void Run()
  {
   //set the id of the rxprocessor thread
   processorThreadId = Thread.CurrentThread.ManagedThreadId;

   //mark the processor thread as being started
   threadStartedEvent.Set();

   while (!BufferClearedAndShutDown())
   {
    try
    {
     ProcessNextMessage();
    }
    catch (ThreadAbortException)
    {
     CreateAndRunThread();
     break;
    }
    catch (Exception e) //policy handled
    {
     //thelogger.HandleException(e, ExceptionPolicy.Log);
    }
   }
  }

  private void ProcessNextMessage()
  {
   currentJob = theJobQueue.RetrieveJob();
   if (currentJob != null)
   {
    theObserver.ProcessMessage(this, currentJob);
   }
   currentJob = null;
  }

  private bool BufferClearedAndShutDown()
  {
   return theJobQueue.IsEmpty && shutdownRequested;
  }
 }
}
+2  A: 

Is you thread trying to catch ThreadAbortException and then recreate itself? I am not sure it is possible, but anyway it is not a nice way to play with the OS.

And you will lose jobs if exception happens after currentJob = theJobQueue.RetrieveJob(); but before theObserver.ProcessMessage(this, currentJob);

And unless your jobQueue is thread safe you should add locking around accessing it.

Alex Reitbort
+1  A: 

Very hard to give you useful feedback without knowing the semantics of IJobQueue, IJobObserver, or IJobProcessor, but here are a few details that stand out:

  1. processorThread doesn't seem like it is really needed; can/should just be a local in CreateAndRunThread
  2. shutdownRequested should be marked volatile, call Thread.MemoryBarrier() after setting shutdownReqeusted to true, or use Thread.VolatileWrite to set the shutdownRequested field
  3. why wait for the thread to start before asking it to shutdown?
  4. don't know why you need a threadStartedEvent, what is it used for? espesially making it public is a bit scary
  5. without knowing how IJobQueue.Interrupt is implemented, hard to say if there are issues there or not
  6. Who uses CurrentJob, and why? feels risky
  7. While catching your own ThreadAbortException will catch most cases, it won't catch everything. You could use a separate monitor thread, which calls Thread.Join, and after double checking BufferClearedAndShutdown() invokes CreateAndRunThread()
Randy Kern
+2  A: 

Is this just a producer/consumer queue? There are lots of pre-rolled examples - I posted one here myself a few days ago (but I can't find it at the moment)... put it this way - the version I posted was much simpler - i.e. "obviously no bugs" rather than "no obvious bugs". I'll see if I can find it...

(edit: found it)

The point is; with that version, the worker thread just does:

T item;
while(queue.TryDequeue(out item)) {
    // process item
}
// queue has been closed and drained
Marc Gravell
A: 

theres a lot of context missing from the code, but id just like to add my 2 pennies to the above, which i agree with.

in Randy's item 2, i would just use a lock statement as opposed to memory barrier, this is MSDN recommended approach unless you are using multi Itanium CPU's on your boxes (which have weak memory ordering)

http://msdn.microsoft.com/en-us/library/system.threading.thread.memorybarrier.aspx

i wholly agree with item 6. a public method exposing a ref to an item in the jobqueue? that cant be good can it?, especially when you are discussing thread safety, if you need some info from this class exposing to the outside world, make it immutable and expose that instead. Much better encapsulation.

Matt
A: 

Now that some people have already answered I feel it is safe to add my own comments without biasing anyone :-)

  1. Why would you want to wait for the thread to start before shutting it down?
  2. Recording the thread ID and using it as a way to identify which thread can shut it down?
  3. currentJob is marked volatile but not shutdownRequested (and it's not set within a lock)
  4. Why mix a queue + a worker in one? Why not have a queue separate, and then one or more threads working with it.
  5. I can't think why he'd be expecting a ThreadAbortException so as to ditch the current worker thread and create a new one.
  6. Despite the comment at the top of the code about how the class could simply be extended to utilise multiple worker threads I don't think that this design makes moving to multiple worker threads very easy at all (unlike an approach where the queue / workers are separate).

I'm no multi-thread guru or anything so I wanted to post it just to make sure it wasn't just me that thought this code looked a bit dodgy.

Peter Morris