views:

432

answers:

3

I have a queue in which I can enqueue different threads, so I can assure two things:

  1. Request are processed one by one.
  2. Request are processed in the arriving order

Second point is important. Otherwise a simple critical section would be enough. I have different groups of requests and only inside a single group these points must be fulfilled. Requests from different groups can run concurrent.

It looks like this:

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

EDIT: I have removed the actual implementation because it hides the problem I want to solve

I need this because I have an Indy based web server that accepts http requests. First I find a coresponding session for the request. Then the request (code) is executed for that session. I can get multiple requests for the same session (read I can get new requests while the first is still processing) and they must execute one by one in correct order of arrival. So I seek a generic synchronization queue that can be use in such situations so requests can be queued. I have no control over the threads and each request may be executed in a different thread.

What is best (ususal) approach to this sort of problem? The problem is that Enqueue and Dequeue must be atomic opeations so that correct order is preserverd. My current implementation has a substantial bottleneck, but it works.

EDIT: Bellow is the problem of atomic Enqueue / Dequeue operations

You wold normaly do something like this:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

Now the problem here is that this is not atomic. If you have one thread already in the queue and another one comes and calls Enqueue, it can happen, that the second thread will just leave the critical section and try to block itself. Now the thread scheduler will resume the first thread, which will try to unblock the next (second) thread. But second thread is not blocked yet, so nothing happens. Now the second thread continues and blocks itself, but that is not correct because it will not be unblocked. If blocking is inside critical section, that the critical section is never leaved and we have a deadlock.

+2  A: 

I'll answer with the additional information from your comment taken into consideration.

If you have a number of threads that need to be serialized then you could make use of the serialization mechanism Windows provides for free. Let each queue be a thread with its own window and a standard message loop. Use SendMessage() instead of PostThreadMessage(), and Windows will take care of blocking the sending threads until the message has been processed, and of making sure that the correct execution order is maintained. By using a thread with its own window for each request group you make sure that multiple groups are still processed concurrently.

This is a simple solution that will work only if the request itself can be handled in a different thread context than it originated in, which shouldn't be a problem in many cases.

mghie
Note that this is the same principle that is used to serialize methods calls to a STA COM object.
mghie
True, this could be a good solution and yes it is not a problem to introduce a new thread, that is listening for messages. The only downside (apart from dummy windows) is, that this way, I need another thread pool. I already have perfectly good threads from Indy server. But it may be the only viable solution. Lets wait if somebody has another approach
Runner
Note that there is no new thread, since you already had `TTaskQueueThread`. You'd just let it have a window and use blocking message processing.
mghie
+1 for good ideas ;)
Runner
+6  A: 

Another approach:

Let each request thread have a manual reset event that is initially unset. The queue manager is a simple object which maintains a thread-safe list of such events. The Enqueue() and Dequeue() methods both take the event of the request thread as a parameter.

type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

Each request thread calls Enqueue() on the queue manager and afterwards waits for its own event to become signalled. Then it processes the request and calls Dequeue():

{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

The queue manager locks the list of events both in Enqueue() and in Dequeue(). If the list is empty in Enqueue() it sets the event in the parameter, otherwise it resets the event. Then it appends the event to the list. Thus the first thread can continue with the request, all others will block. In Dequeue() the event is removed from the top of the list, and the next event is set (if there is any).

That way the last request thread will cause the next request thread to unblock, completely without suspending or resuming threads. This solution does also not need any additional threads or windows, a single event object per request thread is all that is needed.

mghie
That is exactly the approach I used at first. The only difference was that I used Suspend / Resume instead if events, which is inferior (my approach is).But look at the problems you run into (I updated the question). The operations must be atomic. That is why I introduced aditional thread so all operations were issued from another thread and they were completely atomic without any critical sections. But then a substantial bottlenect is introduced. If you can solve this problem then I will accept your answer because everything else is already optimal in your answer.
Runner
Now there are two options here. Either I am not seeing something and the solution is simple, or I am barking at the wrong tree completely :)
Runner
Ah you posted the code. Yes I think this should work. I knew it was simple and I missed something :) I will try to implement it in this manner. If it holds the water, which I am sure it will, the I will accept your answer. I don't think it can be done better that this.
Runner
I must be missing something. There are indeed times when more than one request thread can run, but the important sections (request processing) are serialized. There is also no risk of deadlock that I can see?
mghie
Yes, the difference is in the Event versus Suspend / Resume approach. When calling Suspend you are suspended imediatelly and thus the problems I described. But with your approach using events You can signal it first and be sure it is atomic and then later call WaitFor. This is what I have been missing. I was so fixated on my problem that I did not see this solution.
Runner
Works like a charm. It is basically what I was trying to do in the first place with the exception that it works correctly. Thanks for all the help.
Runner
A: 

Did you try the TThreadList object provided by Delphi ?

It is thread safe and it manage the locks for you. You manage the list "outside" the thread, within your main thread.

As requests ask for a new task, you add it to the list. When a thread finishes, with the OnTerminate event you can call the next thread in the list.

Pmax
Yes I know TThreadList. The problem was not the thread safe list, I had that covered. The problem was how to correctly synchronize the FIFO queue. The accepted answers from mghie solved that perfectly. My problem was that I took the wrong approach with suspending / resuming threads instead of just using events.
Runner