Here's some code that someone wants to use in a production app (not me, honest) - I'd like some independent feedback on it please.
// In this example, the JobProcessor has only one thread in it.
// It could be extensed to have multiple processors where each one has an
// observer to work.
public class JobProcessor<TJob> : IJobProcessor<TJob> where TJob : class
{
/// <summary>
/// the buffer of messages waiting to be processed
/// </summary>
private readonly IJobQueue<TJob> theJobQueue =
new NullQueue<TJob>();
// private readonly ILogger thelogger;
/// <summary>
/// the thread that does the processing
/// </summary>
private Thread processorThread;
/// <summary>
/// a shutdown has been requested
/// </summary>
private bool shutdownRequested;
private readonly IJobObserver<TJob> theObserver = new NullObserver<TJob>();
/// <summary>
/// this is set when the rxprocessor thread is set
/// </summary>
public AutoResetEvent threadStartedEvent = new AutoResetEvent(false);
/// <summary>
/// the thread id of the rx processor
/// </summary>
private int processorThreadId = 0;
/// <summary>
/// the current job
/// </summary>
private volatile TJob currentJob = null;
/// <summary>
/// Create a new JobProcessor object associated with the specified theJobQueue
/// </summary>
public JobProcessor(IJobQueue<TJob> jobQueue, IJobObserver<TJob> observer)
//ILogger logger)
{
if (observer != null)
{
theObserver = observer;
}
shutdownRequested = false;
theJobQueue = jobQueue;
//thelogger = logger;
CreateAndRunThread();
}
private void CreateAndRunThread()
{
processorThread = new Thread(Run)
{
Name = "Tpk Processor Thread", IsBackground = true
};
processorThread.SetApartmentState(ApartmentState.STA);
processorThread.Priority = ThreadPriority.BelowNormal;
processorThread.Start();
}
/// <summary>
/// Request shutdown of this processor.
/// Once shutdown has been requested, the processor continues to run
/// until its associated theJobQueue is empty. It then stops.
/// </summary>
public void Shutdown()
{
//make sure the thread is started before we can request
//a shutdown
threadStartedEvent.WaitOne();
shutdownRequested = true;
//make sure that if the theJobQueue is blocking the JobProcessor
//thread, that the JobProcessor thread is interupted
theJobQueue.Interrupt(processorThreadId);
}
public TJob CurrentJob()
{
return currentJob;
}
/// <summary>
/// This method forms the JobProcessor's thread of execution.
/// This method repeatedly retrieves the next available message
/// from the associated theJobQueue, and publishes it to subscribers
/// of the OnMessage event. It only stops when it has been asked to
/// shutdown (via a call to its Shutdown operation) and the theJobQueue
/// is empty.
/// Immediately prior to quitting, JobProcessor publishes a "null" message
/// to notify subscribers.
/// </summary>
private void Run()
{
//set the id of the rxprocessor thread
processorThreadId = Thread.CurrentThread.ManagedThreadId;
//mark the processor thread as being started
threadStartedEvent.Set();
while (!BufferClearedAndShutDown())
{
try
{
ProcessNextMessage();
}
catch (ThreadAbortException)
{
CreateAndRunThread();
break;
}
catch (Exception e) //policy handled
{
//thelogger.HandleException(e, ExceptionPolicy.Log);
}
}
}
private void ProcessNextMessage()
{
currentJob = theJobQueue.RetrieveJob();
if (currentJob != null)
{
theObserver.ProcessMessage(this, currentJob);
}
currentJob = null;
}
private bool BufferClearedAndShutDown()
{
return theJobQueue.IsEmpty && shutdownRequested;
}
}
}