views:

627

answers:

4

I need to create a thread in Delphi with the following characteristics:

  • Waits until the main thread adds data to a shared queue.
  • Processes all the data in the queue, returning the results to main thread (for this last part I'll just send messages to the main window). Processing is time-consuming, so new data may be added to the queue while the worker thread is processing previous entries.
  • Resumes waiting, using as little cpu cycles as possible.

I cannot send messages to the thread, since it does not have a window handle.

Should I be using some variant of WaitForObject? If so, what would the wait be for? If not, then how can I keep the thread waiting, then awake it when new data comes down the queue?

I've read Multithreading - The Delphi Way, which doesn't seem to answer my question. Perhaps OmniThreadLibrary can do what I need; I can't tell since there's little documentation. I don't know enough about threads in general to figure out if the library will help here and how to use it (or even why to use it instead of just working with TThread descendants).

+8  A: 

OmniThreadLibrary can definitely help you here. Test 5 from the OTL distribution should help you started.

In this demo, "Start" button creates the thread and sets some parameters and timer (which you can remove in your code if not needed). "Change message" sends a message to the thread and this message is processed in thread's OMChangeMessage method. Thread then sends some information back to the client (OMSendMessage in this demo, but you can do this in the same message you'll be doing your work in) and main thread receives this message via the OmniEventMonitor component. "Stop" button stops the worker thread.

If more messages arrive while your thread is busy, they will be queued and processed as soon as your worker method has completed its work. When there's nothing to do, thread will wait for the next message using zero CPU cycles in the process.

gabr
OmniThreadLibrary has soo much potential if only it were better documented...
Remko
Yes I know, I know ... :(Documentation is first thing on the list after the 1.04 release.
gabr
+1  A: 

You can definitely send messages to a thread, even though it doesn't have a window handle. Just use PostThreadMessage() instead of SendMessage() or PostMessage(). There will be more information here on StackOverflow if you search for PostThreadMessage() in the [delphi] tag - I don't think it's a good idea to duplicate everything here.

But if you are not knowledgeable about thread programming, then starting with OTL instead of the low level stuff may indeed be a good thing.

mghie
+1  A: 

WaitForSingleObject() can wait on several types of synchronization objects. You can use a Windows "event" synchronization object (which has nothing to do with a Delphi event). You create the event (there's a Delphi TEvent wrapper in SyncObjs, IIRC), and call WaitForSingleObject to wait for that event to become signalled. When you have to awake the thread, you call SetEvent to put the event in the signalled state and WaitForSingleObject returns. You can have a thread wait for one (or all) of multiple object using WaitForMultipleObjects() - it will also tell you which object became signalled.

ldsandon
A: 

Here's a simple example how you can do it...

const
  WM_MY_RESULT = WM_USER + $1;

type
  TMyThread = class(TThread)
  private
    FKilled: Boolean;
    FListLock: TRTLCriticalSection;
    FList: TList;
    FJobAdded: TEvent;
  protected
    procedure Execute; override;
    procedure DoJob(AJob: Integer);
  public
    constructor Create(CreateSuspended: Boolean);
    destructor Destroy; override;
    procedure Kill;
    procedure PushJob(AJob: Integer);
    function  JobCount: Integer;
    function  GetJob: Integer;
  end;


  TThreadingForm = class(TForm)
    lstResults: TListBox;
    se: TSpinEdit;
    btn: TButton;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure btnClick(Sender: TObject);
  private
    FThread: TMyThread;
    procedure OnMyResultMessage(var Msg: TMessage); message WM_MY_RESULT;
  public
    { Public declarations }
  end;

var
  ThreadingForm: TThreadingForm;

implementation

{$R *.dfm}

{ TMyThread }

constructor TMyThread.Create(CreateSuspended: Boolean);
begin
  FKilled := False;
  InitializeCriticalSection(FListLock);
  FList := TList.Create;
  FJobAdded := TEvent.Create(nil, True, False, 'job.added');
  inherited;
end;

destructor TMyThread.Destroy;
begin
  FList.Free;
  FJobAdded.Free;
  DeleteCriticalSection(FListLock);
  inherited;
end;

procedure TMyThread.DoJob(AJob: Integer);
var
  res: Integer;
begin
  res := AJob * AJob * AJob * AJob * AJob * AJob;
  Sleep(1000); // so it would take some time
  PostMessage(ThreadingForm.Handle, WM_MY_RESULT, res, 0);
end;

procedure TMyThread.Execute;
begin
  inherited;
  while not FKilled or not Self.Terminated do
  begin
    EnterCriticalSection(FListLock);
    if JobCount > 0 then
    begin
      LeaveCriticalSection(FListLock);
      DoJob(GetJob)
    end
    else
    begin
      FJobAdded.ResetEvent;
      LeaveCriticalSection(FListLock);
      FJobAdded.WaitFor(10000);
    end;
  end;
end;

function TMyThread.GetJob: Integer;
begin
  EnterCriticalSection(FListLock);
  try
    Result := Integer(FList[0]);
    FList.Delete(0);
  finally
    LeaveCriticalSection(FListLock);
  end;
end;

function TMyThread.JobCount: Integer;
begin
  EnterCriticalSection(FListLock);
  Result := FList.Count;
  LeaveCriticalSection(FListLock);
end;

procedure TMyThread.Kill;
begin
  FKilled := True;
  FJobAdded.SetEvent;
  Terminate;
end;

procedure TMyThread.PushJob(AJob: Integer);
begin
  EnterCriticalSection(FListLock);
  try
    FList.Add(Pointer(AJob));
    FJobAdded.SetEvent;
  finally
    LeaveCriticalSection(FListLock);
  end;
end;

{ TThreadingForm }

procedure TThreadingForm.OnMyResultMessage(var Msg: TMessage);
begin
  lstResults.Items.Add(IntToStr(Msg.WParam));
end;

procedure TThreadingForm.FormCreate(Sender: TObject);
begin
  FThread := TMyThread.Create(False);
end;

procedure TThreadingForm.FormDestroy(Sender: TObject);
begin
  FThread.Kill;
  FThread.WaitFor;
  FThread.Free;
end;

procedure TThreadingForm.btnClick(Sender: TObject);
begin
  FThread.PushJob(se.Value);
end;
egon
Your thread class is coded badly. It will raise an AV when it accesses one of the private object fields in the `Execute` method when they have already been freed in the destructor. To prevent that you need to terminate the thread and `WaitFor` it before you free anything.
mghie
Yeah I agree... I think I really didn't think everything through properly...
egon
IMHO calls to Enter/Leave a critical section should be wrapped by a try..finally to avoid leaving the CS locked if an exception occurs.
ldsandon
Most of these things in critical sections won't throw an exception.
egon
Well, "Most" is not enough in this case. This has to happen only once and you're in a deadlock situation that you cannot escape from without restarting your program. It's just a good habit to use `try..finally`. It won't be necessary in 99% and will save you a lot of problems in the remaining 1%.
Smasher
Sure... I totally agree... It's a community wiki so feel free to improve on it.
egon
Even if it's CW, I'm not sure that this can be easily changed to be really correct. For example there is also a problem with the use of a manual reset event, as it is possible that between the check for `JobCount > 0` and `ResetEvent` another job is added to the queue, but the event would still be (wrongly) reset. A semaphore wouldn't have that problem.
mghie
@mghie: that one I really missed/forgot. I've even seen `a := somefunc(b);` and some thread being invoked between the evaluation - `somefunc` then some other thread and then `a := somefunc`. Maybe that's why my intuition said to me to put in that timeout :S. I would probably lower the timeout. But yeah semaphores would be better.
egon
Now is safer... but not as pretty... :S
egon