views:

1846

answers:

4

I couldn't find a decent ThreadPool implementation for Ruby, so I wrote mine (based partly on code from here: http://snippets.dzone.com/posts/show/3276 , but changed to wait/signal and other implementation for ThreadPool shutdown. However after some time of running (having 100 threads and handling about 1300 tasks), it dies with deadlock on line 25 - it waits for a new job there. Any ideas, why it might happen?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)}
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {[email protected]?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
A: 

I'm slightly biased here, but I would suggest modelling this in some process language and model check it. Freely available tools are, for example, the mCRL2 toolset (using a ACP-based language), the Mobility Workbench (pi-calculus) and Spin (PROMELA).

Otherwise I would suggest removing every bit of code that is not essential to the problem and finding a minimal case where the deadlock occurs. I doubt that it the 100 threads and 1300 tasks are essential to get a deadlock. With a smaller case you can probably just add some debug prints which provide enough information the solve the problem.

mweerden
The code in question failed only processing 1300 tasks out of 180000, couldn't reproduce it with a smaller set, unfortunately...
Roman
+1  A: 

Ok, the problem seems to be in your ThreadPool#signal method. What may happen is:

1 - All your worker are busy and you try to process a new job

2 - line 90 gets a nil worker

3 - a worker get freed and signals it, but the signal is lost as the ThreadPool is not waiting for it

4 - you fall on line 95, waiting even though there is a free worker.

The error here is that you can signal a free worker even when nobody is listening. This ThreadPool#signal method should be:

def signal
     @mutex.synchronize { @cv.signal }
end

And the problem is the same in the Worker object. What might happen is:

1 - The Worker just completed a job

2 - It checks (line 17) if there is a job waiting: there isn't

3 - The thread pool send a new job and signals it ... but the signal is lost

4 - The worker wait for a signal, even though it is marked as busy

You should put your initialize method as:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

Next, the Worker#get_block and Worker#reset_block methods should not be synchronized anymore. That way, you cannot have a block assigned to a worker between the test for a block and the wait for a signal.

PierreBdR
I think that you're right! I'll test this right away, thanks!
Roman
Hmm.. now there's a deadlock when I'm waiting for threads to complete (e.g. calling join for the ThreadPool). I'm trying to figure out why.
Roman
+5  A: 

Ok, so the main problem with the implementation is: how to make sure no signal is lost and avoid dead locks ?

In my experience, this is REALLY hard to achieve with condition variables and mutex, but easy with semaphores. It so happens that ruby implement an object called Queue (or SizedQueue) that should solve the problem. Here is my suggested implementation:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { [email protected]? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if [email protected]? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

And here is a simple test code:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
PierreBdR
1. Shouldn't the access to @workers be synchronized?2. Why there's still need to lock and unlock in the worker thread?
Roman
The access to worker is always done from the same thread ... so synchronization is not needed. As for the lock in the worker thread, you need them to wake the thread up safely.
PierreBdR
There's still a problem with this - there's a chance for a deadlock - when the worker thread adds itself to the queue, the ThreadPool can then take it from the queue and assign a task. In that case a signal will be sent. However, if the worker thread is not waiting on a cv, the signal will be lost.
Roman
Roman, no it can't ! To send the signal the thread pool need to acquire the mutex ... that's blocked until the object waits for the signal.
PierreBdR
+2  A: 

You can try the work_queue gem.

Miguel Fonseca