class Gotta::Run::DynamicRunner
This class is responsible for all dynamic runners. It was designed to be used as a inheritable class. For example: `class RubyRunner < Gotta::Run::DynamicRunner
; end`
This class is responsible for all dynamic runners. It was designed to be used as a inheritable class. For example: `class RubyRunner < Gotta::Run::DynamicRunner
; end`
Attributes
Public Class Methods
The initializer takes an input queue, where it should go fetch payloads to send to runners. The default queue is `Gotta::Queue::Memory::Driver`. It also takes an optional argument `runtime_path` that is the path to the runtime script for the language you want to run. You can optionally pass a block in case you need to set extra configuration without having to override the initializer. It will raise `RuntimeScriptNotFound` if `runtime_path` doesn't exist or is not a file.
# File lib/gotta/run.rb, line 46 def initialize(input_queue: Gotta::Queue.new('input'), runtime_path: nil, &block) @input_queue = input_queue @runtime_path = runtime_path raise RuntimeScriptNotFound, path unless File.file?(runtime_path) @parent_socket, @child_socket = UNIXSocket.pair @child_output_r, @child_output_w = IO.pair instance_eval(&block) if block_given? end
Public Instance Methods
# File lib/gotta/run.rb, line 58 def start start_runtime @child_stdout_thread = connect_to_child_stdout start_processing end
Private Instance Methods
# File lib/gotta/run/dynamic_runner.rb, line 72 def cmd cmd = [ 'ruby', '-E', 'UTF-8:UTF-8', @runtime_path ].join(' ') end
# File lib/gotta/run.rb, line 66 def connect_to_child_stdout Thread.new do loop do puts @child_output_r.gets end end end
Sets the environment variables for the runtime process
# File lib/gotta/run.rb, line 75 def env ENV.to_h.merge( 'SOCKET_FD' => @child_socket.fileno ) end
Initializes the response queue from the name that comes with the payload.
# File lib/gotta/run.rb, line 133 def response_queue_for(payload) object = MessagePack.unpack(payload) Gotta::Queue.new(object['response_queue_name']) end
Sends the response to the response queue. A timeout is used here, but ideally this should be done using socket timeouts. TODO: use socket timeouts.
# File lib/gotta/run.rb, line 123 def send_response_to(response_queue) Timeout.timeout(MAX_EXECUTION_TIME) do response_queue.push(parent_socket.recv(MAX_RESPONSE_MSG_SIZE)) end rescue Timeout::Error response_queue.push(Response.timeout.to_msgpack) end
# File lib/gotta/run.rb, line 96 def spawn_process(cmd) Process.spawn( env, cmd, close_others: false, out: @child_output_w, err: %i[child out] ) end
This method is an infinite loop that pops the last element from the queue and passes the payload to the child process. Then while it waits for the child process to perform its task, this method will setup the response queue.
# File lib/gotta/run.rb, line 112 def start_processing input_queue.process do |payload| @parent_socket.send(payload, 0) send_response_to(response_queue_for(payload)) end end
Start the runtime from `path`. It runs in a loop to make sure the runner process gets restarted in case it dies.
# File lib/gotta/run.rb, line 84 def start_runtime(path) cmd = [ruby, '-E', 'UTF-8:UTF-8', path] loop do pid = spawn_process(cmd) Process.wait(pid) sleep 1 end end