views:

1808

answers:

8

What's the best way to implement a Hash that can be modified across multiple threads, but with the smallest number of locks. For the purposes of this question, you can assume that the Hash will be read-heavy. It must be thread-safe in all Ruby implementations, including ones that operate in a truly simultaneous fashion, such as JRuby, and it must be written in pure-Ruby (no C or Java allowed).

Feel free to submit a naïve solution that always locks, but that isn't likely to be the best solution. Points for elegance, but a smaller likelihood of locking wins over smaller code.

+2  A: 

this (video, pdf) is about lock-free hash table implemented in Java.

spoiler: uses atomic Compare-And-Swap (CAS) operations, if not available in Ruby you could emulate them with locks. not sure if that would give any advantage over simple lock-guarded hashtables

Javier
The question specifically said that a Java solution would not be accepted.
Yehuda Katz
it's not an implementation, it's an algorithm presentation.
Javier
This is a good discussion on optimizing hash tables at the processor core level, dealing with cache lines, cache misses, and cache coherence primitives. It's not helpful to solving a problem in pure Ruby, though.
Michael Sofaer
it's all specific on lock-free algorithms
Javier
+1  A: 

Not tested, and a naive stab at optimizing for reads. It assumes that most of the time, the value won't be locked. If it is, the tight loop will try until it is. I put Thread.critical in there to help ensure that the read threads won't be run until the write is completed. Not sure if the critical part is needed, it really depends on how read-heavy you mean, so some benchmarking is in order.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

There may be a few other read methods that would need to check the @semaphore lock, I don't know if everything else is implemented in terms of #[].

Paul
I think this will have race conditions under JRuby.
finnw
Your reader isn't safe, since super isn't atomic. We need to ensure that no writes start while people are reading.
Michael Sofaer
+2  A: 

Posting base/naive solution, just to boost my Stack Overflow cred:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end
josh
The fact that you posted that *just* to get reputation makes me reluctant to upvote. :P
musicfreak
Isn't it the goal of reputation to dangle a carrot which compel users to give good answers?
jshen
Wouldn't you, at the very least, want to mutex all mutating operations on Hash?
Yehuda Katz
+3  A: 

Yehuda, I think you mentioned ivar setting was atomic? What about a simple copy and swap then?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end
josh
What if two write operations dup the hash at the same time before either writes? Won't one of the writes be lost?
Yehuda Katz
Updated. It shouldn't really hurt too much either.
josh
Wouldn't clone be better than dup? IIRC the difference is that clone gives you a frozen object if the original was frozen - which is what you want.
finnw
One of the differences. `clone` will also copy singleton methods whereas `dup` will not.
Logan Capaldo
clone would not work. The hash would still be frozen and `h[key] = value` would blow up.
josh
wouldn't this make write super expensive when the hash is big?
Sam Saffron
A: 

i'm pretty unclear on what is meant by this. i think the simplest implementation is simply

Hash

that is to say the built-in ruby hash is threadsafe if by threadsafe you mean will not blow up if > 1 threads tries to access it. this code will run safely forever

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

i suspect by thread safe you really mean ACID - for instance a write like hash[:key]=:val followed by a read if has[:key] would return :val. but no amount of trickery with locking can provide that - the last in would always win. for example, say you have 42 thread all updating a threadsafe hash - which value should be read by the 43'rd?? surely by threasafe you don't mean some sort of total ordering on writes - therefore if 42 threads were actively writing the 'correct' value is any right? but ruby's built-in Hash works in just this way...

perhaps you mean something like

hash.each do ...

in one thread and

hash.delete(key)

would not interfere with one another? i can imagine wanting that to be threadsafe, but that's not even safe in a single thread with the MRI ruby (obviously you cannot modify a hash while iterating over it)

so can you be more specific about what you mean by 'threadsafe' ??

the only way to give ACID semantics would be a gross lock (sure this could be a method that took a block - but still an external lock).

ruby's thread scheduler isn't just going to schedule a thread smack in the middle of some arbitrary c function (like the built-in hash aref aset methods) so those are effectively threadsafe.

ara t howard
At the very least, I require that if two threads attempt to set a key at the same time, both are set, and that if multiple threads do a write + read in sequence to the same key, the only two possible read values would be the ones that are set (i.e. no nil).I'm not content to observe MRI behavior, because I'd like the solution to work in cases of true simultaneous reads and/or writes, such as in JRuby.
Yehuda Katz
i must say it seems like a bug in jruby that hash writes and reads are not atomic - but that is indeed the case. i will point out that ruby's Hash does give you this for free in 1.8 and 1.9, so really you want a factory that simply returns Hash unless you're in jruby.
ara t howard
I'm not convinced that "threads don't operate simultaneously" can be considered a feature of Ruby. I consider the thread scheduler in MRI (with it's 10ms timeslices) to be an implementation detail -- and I *want* both MRI and other implementations to be free to improve threading semantics without such improvements being considered a bug.
Yehuda Katz
I think you missed this part of the question: "must be thread-safe in all Ruby implementations, including ones that operate in a truly simultaneous fashion, such as JRuby"
finnw
+1  A: 

Since you mention the Hash would be read heavy, having one mutex locking both read and writes would result in race conditions that are most probably won by reads. If that's ok with you, then ignore the answer.

If you want to give writes a priority, an read-write lock would help. The following code is based on some old c++ assignment for Operating Systems class, so might not be best quality, but gives a general idea.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

Then just wrap []= and [] in lock.write and lock.read. Might have a performance impact, but will guarantee that writes will 'get through' the reads. Usefulness of this depends on how read heavy it actually is.

grk
+2  A: 

This is a wrapper class around Hash that allows concurrent readers, but locks things down for all other types of access (including iterated reads).

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

Here is the locking code it uses:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

The thread-aware lock is to allow you to lock the class once, and then call methods that would normally lock, and have them not lock. You need this because you yield into blocks inside some methods, and those blocks can call locking methods on the object, and you don't want a deadlock or a double-lock error. You could use a counting lock instead for this.

Here's an attempt to implement bucket-level read-write locks:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

I stopped working on this because it's too slow, so the each method is unsafe (allows mutations by other threads during an iteration) and it doesn't support most hash methods.

And here's a test harness for concurrent hashes:

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
       :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
       each_write_work(hash, k, v)
     else
       each_read_work(k, v)
     end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

Numbers: Jruby

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

And MRI

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

MRI numbers are pretty striking. Locking in MRI really sucks.

Michael Sofaer
Nice. Yes, I'd like the solution to support every Hash method. I'm concerned that implementing the bucket semantics in pure-Ruby would actually be slower than just biting the bullet and taking the perf hit of locking every time. Care to bench it?
Yehuda Katz
+5  A: 

okay, now that you specified the actually meaning of 'threadsafe' here are two impls. the following code will run forever in mri and jruby. the lockless impl follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. there is a little trickery require to make sure storing all the info in the thread doesn't leak memory, but that is handled and tested - process size does not grow running this code. both impls would need more work to be 'complete' - meaning delete, update, etc would need some thinking, but either of the two concepts below will meet your requirements.

it's very important for people reading this thread to realize the whole issue is exclusive to jruby - in MRI the built-in Hash is sufficient.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end
ara t howard
Sorry it took me so long to get back to reading this. I actually really like this, but I'm a bit uncomfortable by your assertion that "this is only a problem in JRuby." That may well be true today (or it might not be, once implementations like macruby and maglev are taken into consideration), but it can hardly be taken as a given into the future. In fact, I wouldn't be all that surprised if Ruby 2.0 managed to avoid the GIL altogether in favor of finer grained locks. And it will likely be an issue in Maglev, MacRuby, and possibly Rubinius.
Yehuda Katz
I'm accepting this solution; it would be nice to see some benchmarks against the naïvely implemented locking solution.
Yehuda Katz
I'd also like to see benchmarks against Josh's copy-and-swap solution (which we use in Rails, and was my original stab at this problem)
Yehuda Katz
I tried to benchmark this with the other implementations, but I didn't see an easy way to add iterators without adding locks and still preserve getting all the keys exactly once.I can bench it without iterators if you like.
Michael Sofaer