class DaemonKit::RuoteWorkitem

Dual purpose class that is a) responsible for parsing incoming workitems and delegating to the correct RuotePseudoParticipant, and b) wrapping the workitem hash into something a bit more digestable.

Public Class Methods

new( workitem = {} ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 109
def initialize( workitem = {} )
  @workitem = workitem
end
parse( workitem ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 99
def parse( workitem )
  begin
    return new( JSON.parse( workitem ) )
  rescue JSON::ParserError => e
    DaemonKit.logger.error "No valid JSON payload found in #{workitem}"
    return nil
  end
end
parse_command( work ) click to toggle source

Extract the class and method name from the workitem, then pick the matching class from the registered list of participants

# File lib/daemon_kit/ruote_workitem.rb, line 72
def parse_command( work )
  return nil if work['params']['command'].nil?

  _, klass, method = work['params']['command'].split('/')

  instance = RuoteParticipants.instance.participants[ klass ]

  if instance.nil?
    msg = "No instance registered for #{klass}"
    DaemonKit.logger.error( msg )
    raise DaemonKit::MissingParticipant, msg
  end

  return instance, method
end
process( transport, from, workitem ) click to toggle source

Process incoming commands via an AMQP queue

Expects a JSON workitem from ruote that has these fields set in fields key:

{
  'reply_queue'    => 'queue to send replies to',
  'params' => {
    'command'  => '/actor/method'
  }
}

Notes on the command key:

It looks like a resource, and will be treated as such. Is should be in the format of /class/method, and it will be passed the complete workitem as a hash.

Notes on replies

Replies are sent back to the queue specified in the reply_queue key.

Notes on errors

Where daemon-kit detects errors in attempting to parse and delegate the workitems, it will reply to the engine and set the following field with the error information:

daemon_kit.error
# File lib/daemon_kit/ruote_workitem.rb, line 39
def process( transport, from, workitem )
  # keep it singleton
  @instance ||= new

  work = parse( workitem )

  # Invalid JSON... mmm
  return if work.nil?

  DaemonKit.logger.warn "Processing workitem that has timed out!" if work.timed_out?

  target, method = parse_command( work )

  if target.nil? || method.nil?
    msg = "Missing target/method in command parameter, or command parameter missing"
    DaemonKit.logger.error( msg )
    work["__error__"] = msg

  elsif target.public_methods.map { |m| m.to_s }.include?( method ) # 1.8.x => [ 'foo' ]
    target.perform( method, work )                                  # 1.9.x => [ :foo ]

  else
    msg = "Workitem cannot be processes: '#{method}' not exposed by #{target.inspect}"
    DaemonKit.logger.error( msg )
    p [ :work, work.inspect ]
    work["__error__"] = msg
  end

  reply_to_engine( transport, from, work )
end
reply_to_engine( transport, from, response ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 88
def reply_to_engine( transport, from, response )
  send( "reply_via_#{transport}", from, response )
end
reply_via_amqp( destination_queue, response ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 92
def reply_via_amqp( destination_queue, response )
  DaemonKit.logger.debug("Replying to engine via AMQP with #{response.inspect}")
  ::AMQP::Channel.new.queue( destination_queue, :durable => true ).publish( response.to_json )

  response
end

Public Instance Methods

[]( key ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 153
def []( key )
  self.fields[ key ]
end
[]=( key, value ) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 157
def []=( key, value )
  self.fields[ key ] = value
end
attributes()

backwards compatible..

Alias for: fields
dispatch_time() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 127
def dispatch_time
  @dispath_time ||= Time.parse( @workitem['dispatch_time'] )
end
fei() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 113
def fei
  @workitem['fei']
end
fields() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 143
def fields
  @workitem['fields'] ||= @workitem['attributes']
end
Also aliased as: attributes
has_attribute?(a)
Alias for: has_field?
has_field?(a) click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 139
def has_field?(a)
  self.fields.keys.include?( a )
end
Also aliased as: has_attribute?
last_modified() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 131
def last_modified
  @last_modified ||= Time.parse( @workitem['last_modified'] )
end
method_missing( method_name, *args ) click to toggle source
Calls superclass method
# File lib/daemon_kit/ruote_workitem.rb, line 178
def method_missing( method_name, *args )
  if self.fields.keys.include?( method_name.to_s )
    return self.fields[ method_name.to_s ]
  end

  super
end
participant_name() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 135
def participant_name
  @workitem['participant_name']
end
short_fei() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 117
def short_fei
  @short_fei ||=
    '(' + [
           'fei', self.fei['owfe_version'], self.fei['engine_id'],
           self.fei['workflow_definition_url'], self.fei['workflow_definition_name'],
           self.fei['workflow_definition_revision'], self.fei['wfid'],
           self.fei['expression_name'], self.fei['expid']
          ].join(' ') + ')'
end
timed_out?() click to toggle source

Look at the workitem payload and attempt to determine if this workitem has timed out or not. This method will only ever work if you used the +:timeout: parameter was set for the expression.

# File lib/daemon_kit/ruote_workitem.rb, line 168
def timed_out?
  key = fei['wfid'] + '__' + fei['expid']

  if self.fields["__timeouts__"] && timeout = self.fields["__timeouts__"][ key ]
    return Time.at( timeout.last ) < Time.now
  end

  return false
end
to_json() click to toggle source
# File lib/daemon_kit/ruote_workitem.rb, line 161
def to_json
  @workitem.to_json
end