This is a wrapper class around Hash that allows concurrent readers, but locks things down for all other types of access (including iterated reads).
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
Here is the locking code it uses:
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
The thread-aware lock is to allow you to lock the class once, and then call methods that would normally lock, and have them not lock. You need this because you yield into blocks inside some methods, and those blocks can call locking methods on the object, and you don't want a deadlock or a double-lock error. You could use a counting lock instead for this.
Here's an attempt to implement bucket-level read-write locks:
class SafeBucket
def initialize
@lock = RWLock.new()
@value_pairs = []
end
def get(key)
@lock.lock_read
pair = @value_pairs.select{|p| p[0] == key}
unless pair && pair.size > 0
@lock.unlock_read
return nil
end
ret = pair[0][1]
@lock.unlock_read
ret
end
def set(key, value)
@lock.lock_write
pair = @value_pairs.select{|p| p[0] == key}
if pair && pair.size > 0
pair[0][1] = value
@lock.unlock_write
return
end
@value_pairs.push [key, value]
@lock.unlock_write
value
end
def each
@value_pairs.each{|p| yield p[0],p[1]}
end
end
class MikeConcurrentHash
def initialize
@buckets = []
100.times {@buckets.push SafeBucket.new}
end
def [](key)
bucket(key).get(key)
end
def []=(key, value)
bucket(key).set(key, value)
end
def each
@buckets.each{|b| b.each{|key, value| yield key, value}}
end
def bucket(key)
@buckets[key.hash % 100]
end
end
I stopped working on this because it's too slow, so the each method is unsafe (allows mutations by other threads during an iteration) and it doesn't support most hash methods.
And here's a test harness for concurrent hashes:
require 'thread'
class HashHarness
Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
:that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
:for, :all, :ruby, :implementations]
def self.go
h = new
r = h.writiness_range(20, 10000, 0, 0)
r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
return
end
def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
@classes = classes
end
def writiness_range(basic_threads, ops, each_threads, loops)
result = {}
@classes.each do |hash_class|
res = []
0.upto 10 do |i|
writiness = i.to_f / 10
res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
end
result[hash_class.name] = res
end
result
end
def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
time = Time.now
threads = []
hash = hash_class.new
populate_hash(hash)
begin
basic_threads.times do
threads.push Thread.new{run_basic_test(hash, writiness, ops)}
end
each_threads.times do
threads.push Thread.new{run_each_test(hash, writiness, loops)}
end
threads.each{|t| t.join}
rescue ThreadError => e
p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
return -1
end
p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
return Time.now - time
end
def run_basic_test(hash, writiness, ops)
ops.times do
rand < writiness ? hash[choose_key]= rand : hash[choose_key]
end
end
def run_each_test(hash, writiness, loops)
loops.times do
hash.each do |k, v|
if rand < writiness
each_write_work(hash, k, v)
else
each_read_work(k, v)
end
end
end
end
def each_write_work(hash, key, value)
hash[key] = rand
end
def each_read_work(key, value)
key.to_s + ": " + value.to_s
end
def choose_key
Keys[rand(Keys.size)]
end
def populate_hash(hash)
Keys.each{|key| hash[key]=rand}
end
end
Numbers:
Jruby
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash 1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash 0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001
And MRI
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 9.214 9.913 9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash 19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash 0.535 0.537 0.534 0.599 0.594 0.676 0.635 0.650 0.654 0.661 0.692
MRI numbers are pretty striking. Locking in MRI really sucks.