class Arsenicum::Core::Worker

Constants

CONTROL_PING
CONTROL_STOP
RESULT_FAILURE
RESULT_SUCCESS

Attributes

active[R]
active?[R]
broker[R]
ctrl_in_child[R]
ctrl_in_parent[R]
ctrl_out_child[R]
ctrl_out_parent[R]
formatter[R]
in_child[R]
in_parent[R]
index[R]
out_child[R]
out_parent[R]
pid[R]
serializer[R]
state[R]
thread[R]
work_at[R]

Public Class Methods

new(broker, index, worker_configuration) click to toggle source
# File lib/arsenicum/core/worker.rb, line 22
def initialize(broker, index, worker_configuration)
  @broker     = WeakRef.new broker # avoiding circular references.
  @index      = index
  @serializer = worker_configuration[:serializer]
  @formatter  = worker_configuration[:formatter]
  @thread     = InvokerThread.new(self)
  @work_at    = :parent
  @state      = :parent

  run
end

Public Instance Methods

==(another) click to toggle source
# File lib/arsenicum/core/worker.rb, line 34
def ==(another)
  return false unless another.is_a? ::Arsenicum::Core::Worker

  return another.pid == self.pid
end
ask(task_id, *parameters) click to toggle source
# File lib/arsenicum/core/worker.rb, line 40
def ask(task_id, *parameters)
  write_message                   out_parent, task_id,  serialize(parameters)
  loop do
    rs, = select([in_parent], [], [], 5)
    break if rs
    sleep 0.5
  end

  result, marshaled_exception = read_message  in_parent
  return if result == RESULT_SUCCESS
  raise Marshal.load(marshaled_exception)
end
ask_async(success_handler, failure_handler, task_id, *parameters) click to toggle source
# File lib/arsenicum/core/worker.rb, line 53
def ask_async(success_handler, failure_handler, task_id, *parameters)
  thread.ask success_handler, failure_handler, task_id, *parameters
end
return_to_broker() click to toggle source
# File lib/arsenicum/core/worker.rb, line 69
def return_to_broker
  broker.get_back_worker self
end
stop() click to toggle source
# File lib/arsenicum/core/worker.rb, line 57
def stop
  thread.terminate
  return unless child_process_alive?

  write_message   ctrl_out_parent, COMMAND_STOP
  Process.waitpid pid
end

Private Instance Methods

child_process_alive?() click to toggle source
# File lib/arsenicum/core/worker.rb, line 78
def child_process_alive?
  !Process.waitpid(pid, Process::WNOHANG)
end
deserialize(string) click to toggle source
# File lib/arsenicum/core/worker.rb, line 184
def deserialize(string)
  formatter.parse(serializer.deserialize(string))
end
handle_control() click to toggle source
# File lib/arsenicum/core/worker.rb, line 168
def handle_control
  begin
    control, = read_message ctrl_in_child
    case control
      when CONTROL_STOP
        info message: '[Control]Received stop command.'
        thread.terminate
        switch_state  :stopped
    end
  end
end
handle_request() click to toggle source
# File lib/arsenicum/core/worker.rb, line 140
def handle_request
  switch_state  :busy
  begin
    task_id_string, content = read_message in_child, encoding: Encoding::UTF_8
  rescue Arsenicum::IO::EOFException
    # Interrupted request: No required GC.
    return
  end

  task_id         = task_id_string.to_sym
  task            = broker[task_id]
  parameters      = content.length == 0 ? [] : deserialize(content)

  begin
    info message: "Task[#{task_id}] start"
    info message: "Parameters: #{parameters.inspect}"

    task.run      *parameters
    info message: 'Task success'
    write_message out_child,  RESULT_SUCCESS
  rescue Exception => e
    error message: "Task #{task_id} Failed", exception: e
    write_message out_child,  RESULT_FAILURE, Marshal.dump(e)
  end
ensure
  switch_state  :waiting
end
hook_signal() click to toggle source
# File lib/arsenicum/core/worker.rb, line 215
def hook_signal
  [:USR1, :USR2, ].each do |sig|
    Signal.trap sig do
      exit 1
    end
  end
end
log_message_for(message) click to toggle source
# File lib/arsenicum/core/worker.rb, line 211
def log_message_for(message)
  "[Worker ##{object_id}]#{message}"
end
open_binary_pipes() click to toggle source
# File lib/arsenicum/core/worker.rb, line 96
def open_binary_pipes
  IO.pipe.each do |io|
    io.set_encoding 'BINARY'
  end.tap do |pipes|
    pipes.last.sync = true
  end
end
process_name() click to toggle source
# File lib/arsenicum/core/worker.rb, line 196
def process_name
  "arsenicum[Worker ##{index}] - #{state}"
end
run() click to toggle source
# File lib/arsenicum/core/worker.rb, line 82
def run
  (@in_parent, @out_child)            = open_binary_pipes
  (@in_child, @out_parent)            = open_binary_pipes
  (@ctrl_in_parent, @ctrl_out_child)  = open_binary_pipes
  (@ctrl_in_child,  @ctrl_out_parent) = open_binary_pipes

  @pid = fork &method(:run_in_child)
  return unless @pid

  @active = true
  [in_child, out_child, ctrl_in_child, ctrl_out_child].each(&:close)
  pid
end
run_in_child() click to toggle source
# File lib/arsenicum/core/worker.rb, line 104
  def run_in_child
    switch_state  :waiting
    [in_parent, out_parent, ctrl_in_parent, ctrl_out_parent].each(&:close)
    @work_at      = :child

    hook_signal

    begin
      loop do
        server_loop
        break unless state == :waiting
      end
    ensure
      [in_child, out_child, ctrl_in_child, ctrl_out_child].each do |io|
        begin io.close rescue nil end
      end
    end
  end

  def switch_state(state)
    @state  = state
    $0      = process_name
  end

  def server_loop
    begin
      rs, = select [in_child, ctrl_in_child], [], [], 0.5
      return unless rs
    rescue Interrupt
      switch_state :interrupted
      return
    end

    rs.first == in_child ? handle_request : handle_control
  end

  def handle_request
    switch_state  :busy
    begin
      task_id_string, content = read_message in_child, encoding: Encoding::UTF_8
    rescue Arsenicum::IO::EOFException
      # Interrupted request: No required GC.
      return
    end

    task_id         = task_id_string.to_sym
    task            = broker[task_id]
    parameters      = content.length == 0 ? [] : deserialize(content)

    begin
      info message: "Task[#{task_id}] start"
      info message: "Parameters: #{parameters.inspect}"

      task.run      *parameters
      info message: 'Task success'
      write_message out_child,  RESULT_SUCCESS
    rescue Exception => e
      error message: "Task #{task_id} Failed", exception: e
      write_message out_child,  RESULT_FAILURE, Marshal.dump(e)
    end
  ensure
    switch_state  :waiting
  end

  def handle_control
    begin
      control, = read_message ctrl_in_child
      case control
        when CONTROL_STOP
          info message: '[Control]Received stop command.'
          thread.terminate
          switch_state  :stopped
      end
    end
  end

  def serialize(parameter)
    serializer.serialize(formatter.format(parameter))
  end

  def deserialize(string)
    formatter.parse(serializer.deserialize(string))
  end

  def trap_signal
    %w(TERM INT).each do |sig|
      Signal.trap sig do
        exit 5
      end
    end
  end

  def process_name
    "arsenicum[Worker ##{index}] - #{state}"
  end

  [:debug,  :info,  :warn,  :error, :fatal].each do |level|
    eval <<-SCRIPT, binding, __FILE__, __LINE__ + 1
    def #{level}(message: nil, exception: nil)
      Arsenicum::Logger.#{level} do
        message = "[Worker #\#{index}][\#{work_at}]\#{message}" if message
        [message, exception]
      end
    end
    SCRIPT
  end

  def log_message_for(message)
    "[Worker ##{object_id}]#{message}"
  end

  def hook_signal
    [:USR1, :USR2, ].each do |sig|
      Signal.trap sig do
        exit 1
      end
    end
  end

  class InvokerThread < Thread
    attr_accessor :task_request
    private       :task_request,  :task_request=

    def ask(success_handler, failure_handler, task_id, *parameters)
      self.task_request = [success_handler, failure_handler, task_id, parameters]
    end

    def initialize(worker)
      super do
        loop do
          begin
            next sleep(0.5) unless task_request
          rescue Interrupt
            break
          end
          (success_handler, failure_handler, task_id, parameter) = task_request
          self.task_request = nil

          begin
            worker.ask task_id, *parameter
            info worker, message: "Completed processing: #{task_id}"
            success_handler.call
          rescue Interrupt => e
            error worker, exception: e
            failure_handler.call e
            break
          rescue Exception => e
            error worker, exception: e
            failure_handler.call e
          ensure
            worker.return_to_broker
          end
        end
      end
    end

    [:debug,  :info,  :warn,  :error, :fatal].each do |level|
      eval <<-SCRIPT, binding, __FILE__, __LINE__ + 1
        def #{level}(worker, message: nil, exception: nil)
          Arsenicum::Logger.#{level} do
            message = "[Worker #\#{worker.index}][\#{worker.work_at}][thread]\#{message}" if message
            [message, exception]
          end
        end
      SCRIPT
    end
  end
end
serialize(parameter) click to toggle source
# File lib/arsenicum/core/worker.rb, line 180
def serialize(parameter)
  serializer.serialize(formatter.format(parameter))
end
server_loop() click to toggle source
# File lib/arsenicum/core/worker.rb, line 128
def server_loop
  begin
    rs, = select [in_child, ctrl_in_child], [], [], 0.5
    return unless rs
  rescue Interrupt
    switch_state :interrupted
    return
  end

  rs.first == in_child ? handle_request : handle_control
end
switch_state(state) click to toggle source
# File lib/arsenicum/core/worker.rb, line 123
def switch_state(state)
  @state  = state
  $0      = process_name
end
trap_signal() click to toggle source
# File lib/arsenicum/core/worker.rb, line 188
def trap_signal
  %w(TERM INT).each do |sig|
    Signal.trap sig do
      exit 5
    end
  end
end
worker_thread_alive?() click to toggle source
# File lib/arsenicum/core/worker.rb, line 74
def worker_thread_alive?
  thread.alive?
end