I think I may need to re-think my design. I'm having a hard time narrowing down a bug that is causing my computer to completely hang, sometimes throwing an HRESULT 0x8007000E from VS 2010.
I have a console application (that I will later convert to a service) that handles transferring files based on a database queue.
I am throttling the threads allowed to transfer. This is because some systems we are connecting to can only contain a certain number of connections from certain accounts.
For example, System A can only accept 3 simultaneous connections (which means 3 separate threads). Each one of these threads has their own unique connection object, so we shouldn't run in to any synchronization problems since they aren't sharing a connection.
We want to process the files from those systems in cycles. So, for example, we will allow 3 connections that can transfer up to 100 files per connection. This means, to move 1000 files from System A, we can only process 300 files per cycle, since 3 threads are allowed with 100 files each. Therefore, over the lifetime of this transfer, we will have 10 threads. We can only run 3 at a time. So, there will be 3 cycles, and the last cycle will only use 1 thread to transfer the last 100 files. (3 threads x 100 files = 300 files per cycle)
The current architecture by example is:
- A System.Threading.Timer checks the queue every 5 seconds for something to do by calling GetScheduledTask()
- If there's nothing to, GetScheduledTask() simply does nothing
- If there is work, create a ThreadPool thread to process the work [Work Thread A]
- Work Thread A sees that there are 1000 files to transfer
- Work Thread A sees that it can only have 3 threads running to the system it is getting files from
- Work Thread A starts three new work threads [B,C,D] and transfers
- Work Thread A waits for B,C,D
[WaitHandle.WaitAll(transfersArray)]
- Work Thread A sees that there are still more files in the queue (should be 700 now)
- Work Thread A creates a new array to wait on
[transfersArray = new TransferArray[3]
which is the max for System A, but could vary on system - Work Thread A starts three new work threads [B,C,D] and waits for them
[WaitHandle.WaitAll(transfersArray)]
- The process repeats until there are no more files to move.
- Work Thread A signals that it is done
I am using ManualResetEvent to handle the signaling.
My questions are:
- Is there any glaring circumstance which would cause a resource leak or problem that I am experiencing?
- Should I loop thru the array after every
WaitHandle.WaitAll(array)
and callarray[index].Dispose()?
- The Handle count under the Task Manager for this process slowly creeps up
- I am calling the initial creation of Worker Thread A from a System.Threading.Timer. Is there going to be any problems with this? The code for that timer is:
(Some class code for scheduling)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}