views:

57

answers:

2

The following does not work. The call to resources.next_document within the thread returns nil. The same call without threading works as expected.

Any MongoDB experts out there? :P

  resources = db[Resource::COLLECTION].find 

  number_of_threads.times do
    threads << Thread.new do
      while resource = resources.next_document
        puts 'one more doc'
      end
    end
  end  
A: 

Although the driver itself is threadsafe, individuals cursor aren't, so you can't reliably process the data in the way you're describing.

One possibility would be to have a single thread that iterates over the documents, handing them off to any number of worker threads for the actual processing.

Kyle Banker
Interesting... Why would the query being started before or after Thread spawning affect the Cursor differently if the cursor is shared among all threads?
Alexandre
kb: have a look at my answer and tell me what you think
Alexandre
A: 

This is the solution I ended up using:

Feedback welcome

pool = DocumentPool.new(db)
5.times do 
  Thread.new do
    while doc = pool.next_document
      #something cool
    end
  end
end


class DocumentPool   
  COLLECTION = 'some_collection'

  def initialize(db)
    @db = db                
    @first_doc = cursor.next_document      
  end

  def collection
    @db[COLLECTION]
  end

  def cursor
    @cursor ||= collection.find
  end   

  def shift
    doc = nil
    if @first_doc
      doc = @first_doc   
      @first_doc = nil  
    else
      doc = cursor.next_document    
    end
    doc
  end                               

  def count
    collection.count
  end
end
Alexandre
This may work in some situations, but I don't believe it's foolproof. For that, you'd need to synchronize around the collection's next_document method.
Kyle Banker
Why would the next_document method need to be synchronized but not my own "shift" method wouldn't?
Alexandre
I'm not sure that this @first_doc stuff makes sense. All that shift has to do is synchronize around the call to next_document. By the way, in you example code at the top of the example, you call pool.next_document. I think you mean pool.shift.
Kyle Banker