class Dataflow::Executor
Public Class Methods
await_execution_completion(node, completion_queue, expected_completion_count)
click to toggle source
# File lib/dataflow/executor.rb, line 79 def await_execution_completion(node, completion_queue, expected_completion_count) completed_message_indexes = [] unblock = Queue.new consumer = completion_queue.subscribe do |_delivery_info, _properties, payload| data = JSON.parse(payload) unblock.enq(data['error']) if data['error'].present? # Support adding the data to the compute's data_node is the # remote process returns anything. node.data_node&.add(records: data['data']) if data['data'].present? completed_message_indexes << data['msg_id'] if completed_message_indexes.count == expected_completion_count unblock.enq(false) end end error_data = unblock.deq consumer.cancel error_data end
execute(node)
click to toggle source
# File lib/dataflow/executor.rb, line 9 def execute(node) case node.execution_model when :remote execute_remote_computation(node: node, is_batch_execution: false) when :remote_batch execute_remote_computation(node: node, is_batch_execution: true) when :local node.execute_local_computation else raise ArgumentError, "Unknown execution model #{execution_model}" end end
execute_remote_computation(node:, is_batch_execution:)
click to toggle source
# File lib/dataflow/executor.rb, line 22 def execute_remote_computation(node:, is_batch_execution:) execution_uuid = node.execution_uuid raise ArgumentError, "Expected execution uuid to be set on '#{node.name}' (##{node._id})" unless execution_uuid.present? logger.log("Started processing '#{node.name}'") conn, channel, completion_queue = open_communication_channel logger.log("Opened a completion queue for '#{node.name}': #{completion_queue.name}") messages = send_execution_messages(channel, node, is_batch_execution, completion_queue.name) error_data = await_execution_completion(node, completion_queue, messages.count) logger.log("Finished processing '#{node.name}'") raise Errors::RemoteExecutionError.new(error_data['message'], error_data['backtrace']) if error_data ensure conn&.close end
logger()
click to toggle source
# File lib/dataflow/executor.rb, line 103 def logger @logger ||= Dataflow::Logger.new(prefix: 'Executor') end
make_execution_params(node, is_batch_execution, completion_queue_name)
click to toggle source
# File lib/dataflow/executor.rb, line 60 def make_execution_params(node, is_batch_execution, completion_queue_name) execution_params = if is_batch_execution node.make_batch_params else [{}] end execution_params.each_with_index.map do |params, idx| { msg_id: idx, node_id: node._id.to_s, is_batch: is_batch_execution, params: params, execution_uuid: node.execution_uuid.to_s, completion_queue_name: completion_queue_name } end end
open_communication_channel()
click to toggle source
# File lib/dataflow/executor.rb, line 39 def open_communication_channel conn = Bunny.new(ENV['MOJACO_RABBITMQ_URI']) conn.start ch = conn.create_channel completion_queue = ch.queue('', exclusive: true) [conn, ch, completion_queue] end
send_execution_messages(channel, node, is_batch_execution, completion_queue_name)
click to toggle source
# File lib/dataflow/executor.rb, line 49 def send_execution_messages(channel, node, is_batch_execution, completion_queue_name) execution_params = make_execution_params(node, is_batch_execution, completion_queue_name) execution_queue = channel.queue(node.execution_queue) execution_params.each do |exec_params| execution_queue.publish(exec_params.to_json) end execution_params end