class Arsenicum::Core::Worker
Constants
- CONTROL_PING
- CONTROL_STOP
- RESULT_FAILURE
- RESULT_SUCCESS
Attributes
active[R]
active?[R]
broker[R]
ctrl_in_child[R]
ctrl_in_parent[R]
ctrl_out_child[R]
ctrl_out_parent[R]
formatter[R]
in_child[R]
in_parent[R]
index[R]
out_child[R]
out_parent[R]
pid[R]
serializer[R]
state[R]
thread[R]
work_at[R]
Public Class Methods
new(broker, index, worker_configuration)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 22 def initialize(broker, index, worker_configuration) @broker = WeakRef.new broker # avoiding circular references. @index = index @serializer = worker_configuration[:serializer] @formatter = worker_configuration[:formatter] @thread = InvokerThread.new(self) @work_at = :parent @state = :parent run end
Public Instance Methods
==(another)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 34 def ==(another) return false unless another.is_a? ::Arsenicum::Core::Worker return another.pid == self.pid end
ask(task_id, *parameters)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 40 def ask(task_id, *parameters) write_message out_parent, task_id, serialize(parameters) loop do rs, = select([in_parent], [], [], 5) break if rs sleep 0.5 end result, marshaled_exception = read_message in_parent return if result == RESULT_SUCCESS raise Marshal.load(marshaled_exception) end
ask_async(success_handler, failure_handler, task_id, *parameters)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 53 def ask_async(success_handler, failure_handler, task_id, *parameters) thread.ask success_handler, failure_handler, task_id, *parameters end
return_to_broker()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 69 def return_to_broker broker.get_back_worker self end
stop()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 57 def stop thread.terminate return unless child_process_alive? write_message ctrl_out_parent, COMMAND_STOP Process.waitpid pid end
Private Instance Methods
child_process_alive?()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 78 def child_process_alive? !Process.waitpid(pid, Process::WNOHANG) end
deserialize(string)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 184 def deserialize(string) formatter.parse(serializer.deserialize(string)) end
handle_control()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 168 def handle_control begin control, = read_message ctrl_in_child case control when CONTROL_STOP info message: '[Control]Received stop command.' thread.terminate switch_state :stopped end end end
handle_request()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 140 def handle_request switch_state :busy begin task_id_string, content = read_message in_child, encoding: Encoding::UTF_8 rescue Arsenicum::IO::EOFException # Interrupted request: No required GC. return end task_id = task_id_string.to_sym task = broker[task_id] parameters = content.length == 0 ? [] : deserialize(content) begin info message: "Task[#{task_id}] start" info message: "Parameters: #{parameters.inspect}" task.run *parameters info message: 'Task success' write_message out_child, RESULT_SUCCESS rescue Exception => e error message: "Task #{task_id} Failed", exception: e write_message out_child, RESULT_FAILURE, Marshal.dump(e) end ensure switch_state :waiting end
hook_signal()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 215 def hook_signal [:USR1, :USR2, ].each do |sig| Signal.trap sig do exit 1 end end end
log_message_for(message)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 211 def log_message_for(message) "[Worker ##{object_id}]#{message}" end
open_binary_pipes()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 96 def open_binary_pipes IO.pipe.each do |io| io.set_encoding 'BINARY' end.tap do |pipes| pipes.last.sync = true end end
process_name()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 196 def process_name "arsenicum[Worker ##{index}] - #{state}" end
run()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 82 def run (@in_parent, @out_child) = open_binary_pipes (@in_child, @out_parent) = open_binary_pipes (@ctrl_in_parent, @ctrl_out_child) = open_binary_pipes (@ctrl_in_child, @ctrl_out_parent) = open_binary_pipes @pid = fork &method(:run_in_child) return unless @pid @active = true [in_child, out_child, ctrl_in_child, ctrl_out_child].each(&:close) pid end
run_in_child()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 104 def run_in_child switch_state :waiting [in_parent, out_parent, ctrl_in_parent, ctrl_out_parent].each(&:close) @work_at = :child hook_signal begin loop do server_loop break unless state == :waiting end ensure [in_child, out_child, ctrl_in_child, ctrl_out_child].each do |io| begin io.close rescue nil end end end end def switch_state(state) @state = state $0 = process_name end def server_loop begin rs, = select [in_child, ctrl_in_child], [], [], 0.5 return unless rs rescue Interrupt switch_state :interrupted return end rs.first == in_child ? handle_request : handle_control end def handle_request switch_state :busy begin task_id_string, content = read_message in_child, encoding: Encoding::UTF_8 rescue Arsenicum::IO::EOFException # Interrupted request: No required GC. return end task_id = task_id_string.to_sym task = broker[task_id] parameters = content.length == 0 ? [] : deserialize(content) begin info message: "Task[#{task_id}] start" info message: "Parameters: #{parameters.inspect}" task.run *parameters info message: 'Task success' write_message out_child, RESULT_SUCCESS rescue Exception => e error message: "Task #{task_id} Failed", exception: e write_message out_child, RESULT_FAILURE, Marshal.dump(e) end ensure switch_state :waiting end def handle_control begin control, = read_message ctrl_in_child case control when CONTROL_STOP info message: '[Control]Received stop command.' thread.terminate switch_state :stopped end end end def serialize(parameter) serializer.serialize(formatter.format(parameter)) end def deserialize(string) formatter.parse(serializer.deserialize(string)) end def trap_signal %w(TERM INT).each do |sig| Signal.trap sig do exit 5 end end end def process_name "arsenicum[Worker ##{index}] - #{state}" end [:debug, :info, :warn, :error, :fatal].each do |level| eval <<-SCRIPT, binding, __FILE__, __LINE__ + 1 def #{level}(message: nil, exception: nil) Arsenicum::Logger.#{level} do message = "[Worker #\#{index}][\#{work_at}]\#{message}" if message [message, exception] end end SCRIPT end def log_message_for(message) "[Worker ##{object_id}]#{message}" end def hook_signal [:USR1, :USR2, ].each do |sig| Signal.trap sig do exit 1 end end end class InvokerThread < Thread attr_accessor :task_request private :task_request, :task_request= def ask(success_handler, failure_handler, task_id, *parameters) self.task_request = [success_handler, failure_handler, task_id, parameters] end def initialize(worker) super do loop do begin next sleep(0.5) unless task_request rescue Interrupt break end (success_handler, failure_handler, task_id, parameter) = task_request self.task_request = nil begin worker.ask task_id, *parameter info worker, message: "Completed processing: #{task_id}" success_handler.call rescue Interrupt => e error worker, exception: e failure_handler.call e break rescue Exception => e error worker, exception: e failure_handler.call e ensure worker.return_to_broker end end end end [:debug, :info, :warn, :error, :fatal].each do |level| eval <<-SCRIPT, binding, __FILE__, __LINE__ + 1 def #{level}(worker, message: nil, exception: nil) Arsenicum::Logger.#{level} do message = "[Worker #\#{worker.index}][\#{worker.work_at}][thread]\#{message}" if message [message, exception] end end SCRIPT end end end
serialize(parameter)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 180 def serialize(parameter) serializer.serialize(formatter.format(parameter)) end
server_loop()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 128 def server_loop begin rs, = select [in_child, ctrl_in_child], [], [], 0.5 return unless rs rescue Interrupt switch_state :interrupted return end rs.first == in_child ? handle_request : handle_control end
switch_state(state)
click to toggle source
# File lib/arsenicum/core/worker.rb, line 123 def switch_state(state) @state = state $0 = process_name end
trap_signal()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 188 def trap_signal %w(TERM INT).each do |sig| Signal.trap sig do exit 5 end end end
worker_thread_alive?()
click to toggle source
# File lib/arsenicum/core/worker.rb, line 74 def worker_thread_alive? thread.alive? end