class MongoAgent::Agent
@author Darin London Copyright 2014
Attributes
This holds the Moped::Session object that can be used to query information from the MongoDB
hosted by the MONGO_HOST environment variable
@return [Moped::Session]
This holds the log while work! is running
log will be a Hash with the following keys: tasks_processed: int number of tasks processed (success of failure) failed_tasks: int number of tasks that have failed The log is passed to the block that is assigned to process_while (see below)
@return [Hash]
The name of the agent for which tasks will be taken from the queue @return [String]
This holds a block that will be passed the log as an argument and return true
as long as the agent should continue to process tasks when work! is called, and false when work! should stop and return. If not set, the agent will continue to process tasks until it is killed when work! is called
@return [Block]
The name of the task queue that contains the tasks on which this agent will work. @return [String]
number of seconds to sleep between each call to process! when running agent.work! or agent.process_while default 5
Public Class Methods
create a new MongoAgent::Agent
@param attributes [Hash] with name, queue, and optional sleep_between
@option attributes [String] name REQUIRED @option attributes [String] queue REQUIRED @option attributes [Int] sleep_between
OPTIONAL @raise [MongoAgent::Error] name and queue are missing
# File lib/mongo_agent/agent.rb, line 74 def initialize(attributes = nil) if attributes.nil? raise MongoAgent::Error, "attributes Hash required with name and queue keys required" end @name = attributes[:name] @queue = attributes[:queue] unless @name && @queue raise MongoAgent::Error, "attributes[:name] and attributes[:queue] are required!" end build_db() if attributes[:sleep_between] @sleep_between = attributes[:sleep_between] else @sleep_between = 5 end @log = { tasks_processed: 0, failed_tasks: 0 } @process_while = ->(log) { true } end
Public Instance Methods
get A MONGO_DB Moped::Query, either for the specified query Hash, or, when query is nil, all that are currently ready for the @name. This can be used to scan through the tasks on the @queue to perform aggregation tasks: @example collecting information
@agent->get_tasks({ agent_name: @agent->name, error_encountered: true }).each do |task| $stderr.puts "ERROR:\n#{ task.inspect }\n" end
@example update ready to true for tasks that need intervention before they can run
@agent->get_tasks({ agent_name: @agent->name, waiting_for_information: true }).each do |task| task.update('$set' => {ready: true, waiting_form_information: false}) end
@param query [Hash] (optional) any query to find tasks @return [Moped::Query]
# File lib/mongo_agent/agent.rb, line 223 def get_tasks(query = nil) if query.nil? return @db[@queue].find({agent_name: @name, ready: true}) else return @db[@queue].find(query) end end
If a task for the agent is found that is ready, process! registers itself with the task by setting ready to false, and setting its hostname on the :agent_host field, and then passes the task to the supplied block. This block must return a required boolean field indicating success or failure, and an optional hash of key - value fields that will be updated on the task Document. Note, the updates are made regardless of the value of success. In fact, the agent can be configured to update different fields based on success or failure. Also, note that any key, value supported by JSON can be stored in the hash. This allows the agent to communicate any useful information to the task for other agents (MongoAgent::Agent
or human) to use. The block must try at all costs to avoid terminating. If an error is encountered, block should return false for the success field to signal that the process failed. If no errors are encountered block should return true for the success field.
@example Exit successfully and sets :complete to true on the task
@agent->process! do |task_hash| foo = task_hash[:foo] # do something with foo to perform a task true end
@example Same, but also sets the ‘files_processed’ field
@agent->process! { |task_hash| # ... operation using task_hash for information [true, {:files_processed => 30}] }
@example Fails, sets :complete to true, and :error_encountered to true
@failure = ->(task_hash){ begin # ... failing operation using task_hash for information return true rescue return false end } @agent->process!(&@failure)
@example Same, but also sets the ‘notice’ field
@agent->process! do |task_hash| ... [false, {:notice => 'There were 10 files left to process!' }] end
@example This agent passes different parameters based on success or failure
$agent->process! { |task_hash| # ... process and set $success true or false if $success [ $success, {:files_processed => 100} ] else [ $success, {:files_remaining => 10}] end }
@param agent_code [Block, Lambda, or Method] Code to process a task @yieldparam Task [Hash] @yieldreturn [Boolean, Hash] success, (optional) hash of fields to update and values to update on the task
# File lib/mongo_agent/agent.rb, line 153 def process!(&agent_code) (runnable, task) = register() return unless runnable (success, update) = agent_code.call(task) @log[:tasks_processed] += 1 if success complete_task(task, update) else fail_task(task, update) end return end
Iteratively runs process! on the supplied Block, then sleeps :sleep_between between each attempt. Block should match the specifications of what can be passed to process! (see above).
If @process_while is set to a Block, Lambda, or Method, then it is called after
each task is processed, and passed the current @log. As long as the Block returns true, work! will continue to process. work! will stop processing tasks when the Block returns false.
@example process 3 entries and then exit
@agent.process_while = ->(log) { (log[:tasks_processed] < 3) } @agent.work! { |task_hash| #... do something with task_hash and return true of false just as in process! }
@example process until errors are encountered and then exit
@agent.process_while = ->(log) { not(log[:errors_encountered]) } @agent.work! { |task_hash| #... do something with task_hash and return true of false just as in process! } $stderr.puts " #{ @agent.log[:errors_encountered ] } errors were encountered during work."
@param agent_code [Block, Lambda, or Method] Code to process a task @yieldparam Task Hash @yieldreturn [Boolean, Hash] success, (optional) hash of fields to update and values to update on the task
# File lib/mongo_agent/agent.rb, line 194 def work!(&agent_code) while (@process_while.call(@log)) process!(&agent_code) sleep @sleep_between end end
Private Instance Methods
# File lib/mongo_agent/agent.rb, line 266 def build_db @db = Moped::Session.new([ ENV['MONGO_HOST'] ]) @db.use ENV['MONGO_DB'] end
# File lib/mongo_agent/agent.rb, line 245 def complete_task(task, update = nil) if update.nil? update = {} end update[:complete] = true update[:error_encountered] = false update[:completed_at] = Time.now get_tasks({ _id: task[:_id] }).update('$set' => update) end
# File lib/mongo_agent/agent.rb, line 255 def fail_task(task, update = nil) @log[:failed_tasks] += 1 if update.nil? update = {} end update[:complete] = true update[:completed_at] = Time.now update[:error_encountered] = true get_tasks({ _id: task[:_id] }).update('$set' => update) end
# File lib/mongo_agent/agent.rb, line 233 def register task = get_tasks().first unless task $stderr.puts "there are no ready tasks for #{@name} in queue #{@queue}" return false end hostname = Socket.gethostname get_tasks({ _id: task[:_id] }).update('$set' => {ready: false, agent_host: "#{hostname}", started_at: Time.now }) return true, task end