views:

96

answers:

2

I need to retrieve multiple objects from an external system. The external system supports multiple simultaneous requests (i.e. threads), but it is possible to flood the external system - therefore I want to be able to retrieve multiple objects asynchronously, but I want to be able to throttle the number of simultaneous async requests. i.e. I need to retrieve 100 items, but don't want to be retrieving more than 25 of them at once. When each request of the 25 completes, I want to trigger another retrieval, and once they are all complete I want to return all of the results in the order they were requested (i.e. there is no point returning the results until the entire call is returned). Are there any recommended patterns for this sort of thing?

Would something like this be appropriate (pseudocode, obviously)?

  private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>;

  public List<externalSystemObjects> GetObjects(List<string> ids)
  {
      int callCount = 0;
      int maxCallCount = 25;
      WaitHandle[] handles;

      foreach(id in itemIds to get)
      {
          if(callCount < maxCallCount)
          {
               WaitHandle handle = executeCall(id, callback);
               addWaitHandleToWaitArray(handle)
          }
      else
      {
           int returnedCallId = WaitHandle.WaitAny(handles);
           removeReturnedCallFromWaitHandles(handles);
      }
   }

   WaitHandle.WaitAll(handles);

   return returnedObjects;
   }

   public void callback(object result)
   {
         returnedObjects.Add(result);
   }
A: 

Consider the list of items to process as a queue from which 25 processing threads dequeue tasks, process a task, add the result then repeat until the queue is empty:

 class Program
  {
    class State
    {
      public EventWaitHandle Done;
      public int runningThreads;
      public List<string> itemsToProcess;
      public List<string> itemsResponses;
    }

    static void Main(string[] args)
    {
      State state = new State();

      state.itemsResponses = new List<string>(1000);
      state.itemsToProcess = new List<string>(1000);
      for (int i = 0; i < 1000; ++i)
      {
        state.itemsToProcess.Add(String.Format("Request {0}", i));
      }

      state.runningThreads = 25;
      state.Done = new AutoResetEvent(false);

      for (int i = 0; i < 25; ++i)
      {
        Thread t =new Thread(new ParameterizedThreadStart(Processing));
        t.Start(state);
      }

      state.Done.WaitOne();

      foreach (string s in state.itemsResponses)
      {
        Console.WriteLine("{0}", s);
      }
    }

    private static void Processing(object param)
    {
      Debug.Assert(param is State);
      State state = param as State;

      try
      {
        do
        {
          string item = null;
          lock (state.itemsToProcess)
          {
            if (state.itemsToProcess.Count > 0)
            {
              item = state.itemsToProcess[0];
              state.itemsToProcess.RemoveAt(0);
            }
          }
          if (null == item)
          {
            break;
          }
          // Simulate some processing
          Thread.Sleep(10);
          string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
          lock (state.itemsResponses)
          {
            state.itemsResponses.Add(response);
          }
        } while (true);

      }
      catch (Exception)
      {
        // ...
      }
      finally
      {
        int threadsLeft = Interlocked.Decrement(ref state.runningThreads);
        if (0 == threadsLeft)
        {
          state.Done.Set();
        }
      }
    }
  }

You can do the same using asynchronous callbacks, there is no need to use threads.

Remus Rusanu
A: 

Having some queue-like structure to hold the pending requests is a pretty common pattern. In Web apps where there may be several layers of processing you see a "funnel" style approach with the early parts of the processing change having larger queues. There may also be some kind of prioritisation applied to queues, higher priority requests being shuffled to the top of the queue.

One important thing to consider in your solution is that if request arrival rate is higher than your processing rate (this might be due to a Denial of Service attack, or just that some part of the processing is unusually slow today) then your queues will increase without bound. You need to have some policy such as to refuse new requests immediately when the queue depth exceeds some value.

djna