The way I am doing it right now:
- Try to open a file in exclusive locked mode
- When lock is acquired, check for messages in Starling
- If message exists, other process has already scheduled the job
- Set the message again to the queue and exit.
- If message is not found, schedule the job, set the message and exit
Here is the code that does it:
starling = MemCache.new("#{Settings[:starling][:host]}:#{Settings[:starling][:port]}")
mutex_filename = "#{RAILS_ROOT}/config/file.lock"
scheduler = Rufus::Scheduler.start_new
# The filelock method, taken from Ruby Cookbook
# This will ensure unblocking of the files
def flock(file, mode)
success = file.flock(mode)
if success
begin
yield file
ensure
file.flock(File::LOCK_UN)
end
end
return success
end
# open_lock method, taken from Ruby Cookbook
# This will create and hold the locks
def open_lock(filename, openmode = "r", lockmode = nil)
if openmode == 'r' || openmode == 'rb'
lockmode ||= File::LOCK_SH
else
lockmode ||= File::LOCK_EX
end
value = nil
# Kernerl's open method, gives IO Object, in our case, a file
open(filename, openmode) do |f|
flock(f, lockmode) do
begin
value = yield f
ensure
f.flock(File::LOCK_UN) # Comment this line out on Windows.
end
end
return value
end
end
# The actual scheduler
open_lock(mutex_filename, 'r+') do |f|
puts f.read
digest_schedule_message = starling.get("digest_scheduler")
if digest_schedule_message
puts "Found digest message in Starling. Releasing lock. '#{Time.now}'"
puts "Message: #{digest_schedule_message.inspect}"
# Read the message and set it back, so that other processes can read it too
starling.set "digest_scheduler", digest_schedule_message
else
# Schedule job
puts "Scheduling digest emails now. '#{Time.now}'"
scheduler.cron("0 9 * * *") do
puts "Begin sending digests..."
WeeklyDigest.new.send_digest!
puts "Done sending digests."
end
# Add message in queue
puts "Done Scheduling. Sending the message to Starling. '#{Time.now}'"
starling.set "digest_scheduler", :date => Date.today
end
end
# Sleep will ensure all instances have gone thorugh their wait-acquire lock-schedule(or not) cycle
# This will ensure that on next reboot, starling won't have any stale messages
puts "Waiting to clear digest messages from Starling."
sleep(20)
puts "All digest messages cleared, proceeding with boot."
starling.get("digest_scheduler")