views:

556

answers:

6

Let's say I have a multithreaded C++ program that handles requests in the form of a function call to handleRequest(string key). Each call to handleRequest occurs in a separate thread, and there are an arbitrarily large number of possible values for key.

I want the following behavior:

  • Simultaneous calls to handleRequest(key) are serialized when they have the same value for key.
  • Global serialization is minimized.

The body of handleRequest might look like this:

void handleRequest(string key) {
    KeyLock lock(key);
    // Handle the request.
}

Question: How would I implement KeyLock to get the required behavior?

A naive implementation might start off like this:

KeyLock::KeyLock(string key) {
    global_lock->Lock();
    internal_lock_ = global_key_map[key];
    if (internal_lock_  == NULL) {
        internal_lock_  = new Lock();
        global_key_map[key] = internal_lock_;
    }
    global_lock->Unlock();
    internal_lock_->Lock();
}

KeyLock::~KeyLock() {
    internal_lock_->Unlock();
    // Remove internal_lock_ from global_key_map iff no other threads are waiting for it.
}

...but that requires a global lock at the beginning and end of each request, and the creation of a separate Lock object for each request. If contention is high between calls to handleRequest, that might not be a problem, but it could impose a lot of overhead if contention is low.

+10  A: 

You could do something similar to what you have in your question, but instead of a single global_key_map have several (probably in an array or vector) - which one is used is determined by some simple hash function on the string.

That way instead of a single global lock, you spread that out over several independent ones.

This is a pattern that is often used in memory allocators (I don't know if the pattern has a name - it should). When a request comes in, something determines which pool the allocation will come from (usually the size of the request, but other parameters can factor in as well), then only that pool needs to be locked. If an allocation request comes in from another thread that will use a different pool, there's no lock contention.

Michael Burr
+2  A: 

It will depend on the platform, but the two techniques that I'd try would be:

  • Use named mutex/synchronization objects, where object name = Key
  • Use filesystem-based locking, where you try to create a non-shareable temporary file with the key name. If it exists already (=already locked) this will fail and you'll have to poll to retry

Both techniques will depend on the detail of your OS. Experiment and see which works. .

Roddy
Normally can only create so many named Mutexes. On linux at least you can change how many you get, but I would beware of using this method with something to garbage collect old Mutexes.
+2  A: 

Perhaps an std::map<std::string, MutexType> would be what you want, where MutexType is the type of the mutex you want. You will probably have to wrap accesses to the map in another mutex in order to ensure that no other thread is inserting at the same time (and remember to perform the check again after the mutex is locked to ensure that another thread didn't add the key while waiting on the mutex!).

The same principle could apply to any other synchronization method, such as a critical section.

coppro
This is the same implementation as the OP says he doesn't want.
Greg Rogers
+2  A: 

Raise granularity and lock entire key-ranges

This is a variation on Mike B's answer, where instead of having several fluid lock maps you have a single fixed array of locks that apply to key-ranges instead of single keys.

Simplified example: create array of 256 locks at startup, then use first byte of key to determine index of lock to be acquired (i.e. all keys starting with 'k' will be guarded by locks[107]).

To sustain optimal throughput you should analyze distribution of keys and contention rate. The benefits of this approach are zero dynamic allocations and simple cleanup; you also avoid two-step locking. The downside is potential contention peaks if key distribution becomes skewed over time.

Constantin
A: 

After thinking about it, another approach might go something like this:

  • In handleRequest, create a Callback that does the actual work.
  • Create a multimap<string, Callback*> global_key_map, protected by a mutex.
  • If a thread sees that key is already being processed, it adds its Callback* to the global_key_map and returns.
  • Otherwise, it calls its callback immediately, and then calls the callbacks that have shown up in the meantime for the same key.

Implemented something like this:

LockAndCall(string key, Callback* callback) {
    global_lock.Lock();
    if (global_key_map.contains(key)) {
        iterator iter = global_key_map.insert(key, callback);
        while (true) {
            global_lock.Unlock();
            iter->second->Call();
            global_lock.Lock();
            global_key_map.erase(iter);
            iter = global_key_map.find(key);
            if (iter == global_key_map.end()) {
                global_lock.Unlock();
                return;
            }
        }
    } else {
        global_key_map.insert(key, callback);
        global_lock.Unlock();
    }
}

This has the advantage of freeing up threads that would otherwise be waiting for a key lock, but apart from that it's pretty much the same as the naive solution I posted in the question.

It could be combined with the answers given by Mike B and Constantin, though.

eschercycle
A: 
   1.
      /**
   2.
      * StringLock class for string based locking mechanism
   3.
      * e.g. usage
   4.
      *     StringLock strLock;
   5.
      *     strLock.Lock("row1");
   6.
      *     strLock.UnLock("row1");
   7.
      */
   8.
      class StringLock    {
   9.
      public:
  10.
          /**
  11.
           * Constructor
  12.
           * Initializes the mutexes
  13.
           */
  14.
          StringLock()    {
  15.
              pthread_mutex_init(&mtxGlobal, NULL);
  16.
          }
  17.
          /**
  18.
           * Lock Function
  19.
           * The thread will return immediately if the string is not locked
  20.
           * The thread will wait if the string is locked until it gets a turn
  21.
           * @param string the string to lock
  22.
           */
  23.
          void Lock(string lockString)    {
  24.
              pthread_mutex_lock(&mtxGlobal);
  25.
              TListIds *listId = NULL;
  26.
              TWaiter *wtr = new TWaiter;
  27.
              wtr->evPtr = NULL;
  28.
              wtr->threadId = pthread_self();
  29.
              if (lockMap.find(lockString) == lockMap.end())    {
  30.
                  listId = new TListIds();
  31.
                  listId->insert(listId->end(), wtr);
  32.
                  lockMap[lockString] = listId;
  33.
                  pthread_mutex_unlock(&mtxGlobal);
  34.
              } else    {
  35.
                  wtr->evPtr = new Event(false);
  36.
                  listId = lockMap[lockString];
  37.
                  listId->insert(listId->end(), wtr);
  38.
                  pthread_mutex_unlock(&mtxGlobal);
  39.
                  wtr->evPtr->Wait();
  40.
              }
  41.
          }
  42.
          /**
  43.
          * UnLock Function
  44.
          * @param string the string to unlock
  45.
          */
  46.
          void UnLock(string lockString)    {
  47.
              pthread_mutex_lock(&mtxGlobal);
  48.
              TListIds *listID = NULL;
  49.
              if (lockMap.find(lockString) != lockMap.end())    {
  50.
                  lockMap[lockString]->pop_front();
  51.
                  listID = lockMap[lockString];
  52.
                  if (!(listID->empty()))    {
  53.
                      TWaiter *wtr = listID->front();
  54.
                      Event *thdEvent = wtr->evPtr;
  55.
                      thdEvent->Signal();
  56.
                  } else    {
  57.
                      lockMap.erase(lockString);
  58.
                      delete listID;
  59.
                  }
  60.
              }
  61.
              pthread_mutex_unlock(&mtxGlobal);
  62.
          }
  63.
      protected:
  64.
          struct TWaiter    {
  65.
              Event *evPtr;
  66.
              long threadId;
  67.
          };
  68.
          StringLock(StringLock &);
  69.
          void operator=(StringLock&);
  70.
          typedef list TListIds;
  71.
          typedef map TMapLockHolders;
  72.
          typedef map TMapLockWaiters;
  73.
      private:
  74.
          pthread_mutex_t mtxGlobal;
  75.
          TMapLockWaiters lockMap;
  76.
      };