class SimpleJob::Worker
Public Instance Methods
handle(key, &proc)
click to toggle source
# File lib/simplejob/worker.rb, line 26 def handle(key, &proc) # bind the message to the queue bind(@queue, key) # register the handler install_handler(key, proc) end
start(opts, &proc)
click to toggle source
Calls superclass method
SimpleJob::Client::start
# File lib/simplejob/worker.rb, line 3 def start(opts, &proc) super do # only fetch one message at a time instead of being greedy @channel.prefetch(1) # create the queue @queue = queue(opts[:queue_name] || "job_queue") # defer to instance to set up handlers instance_exec(&proc) # start processing jobs start_handler_loop log.info "[worker] Started" end end
stop()
click to toggle source
Calls superclass method
SimpleJob::Client#stop
# File lib/simplejob/worker.rb, line 21 def stop super log.info "[worker] Stopped" end
Private Instance Methods
find_handler(topic)
click to toggle source
# File lib/simplejob/worker.rb, line 75 def find_handler(topic) return nil unless @handlers # check for direct hit if handler = @handlers[topic] handler[:block] # otherwise, test regexes else if key = @handler_keys.detect {|k| @handlers[k][:re].match(topic) } @handlers[key][:block] end end end
install_handler(key, proc)
click to toggle source
# File lib/simplejob/worker.rb, line 67 def install_handler(key, proc) @handlers ||= {} @handlers[key] = { :block => proc, :re => key_to_regex(key) } @handler_keys ||= [] @handler_keys << key end
key_to_regex(key)
click to toggle source
# File lib/simplejob/worker.rb, line 90 def key_to_regex(key) Regexp.compile("^" + key.gsub('*', '[^\.]+').gsub('#', '.*') + "$") end
start_handler_loop()
click to toggle source
# File lib/simplejob/worker.rb, line 36 def start_handler_loop subscribe(@queue) do |headers, body| unless AMQP.closing? topic = headers.properties[:routing_key] begin start_time = Time.now log.info "\n#{"="*80}\n#{topic.upcase} BEGIN -- #{start_time}\n#{"="*80}" props = JSON.parse(body).with_indifferent_access props[:topic] = topic log.info "[worker] Props: #{props.truncated_inspect}" if handler = find_handler(topic) handler.call(props) else log.error "[worker] no handler installed for #{topic}" end end_time = Time.now elapsed = ((end_time - start_time) * 10).round / 10.0 log.info "\n#{"="*80}\n#{topic.upcase} END -- #{end_time} (elapsed: #{elapsed}s)\n#{"="*80}\n" rescue Exception => error log.error "[worker] handle blew up for #{topic} with body:\n#{body}\n#{error}\n#{error.backtrace.join("\n")}" end headers.ack end end end