class ArcFurnace::Pipeline::PipelineInstance
Attributes
dsl_class[R]
error_handler[R]
intermediates_map[R]
params[R]
sink_node[R]
sink_source[R]
Public Class Methods
new(dsl_class, error_handler: ErrorHandler.new, **params)
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 139 def initialize(dsl_class, error_handler: ErrorHandler.new, **params) @dsl_class = dsl_class @params = params @intermediates_map = {} @error_handler = error_handler end
Public Instance Methods
execute()
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 146 def execute build prepare run end
Private Instance Methods
build()
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 171 def build dsl_class.intermediates_map.each do |key, instance| intermediates_map[key] = instance_exec(&instance) if instance end @sink_node = instance_exec(&dsl_class.sink_node) @sink_source = intermediates_map[dsl_class.sink_source] end
create_instance_with_error_handling(type, resolved_params, key_parameters)
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 199 def create_instance_with_error_handling(type, resolved_params, key_parameters) type.new(resolved_params.slice(*key_parameters)) rescue CSV::MalformedCSVError raise "File #{resolved_params[:filename]} cannot be processed." end
prepare()
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 162 def prepare intermediates_map.each do |node_id, instance| instance.error_handler = error_handler instance.node_id = node_id instance.prepare end sink_node.prepare end
resolve_parameter(node_id, key)
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 195 def resolve_parameter(node_id, key) self.params[key] || self.intermediates_map[key] || (raise "When processing node #{node_id}: Unknown key #{key}!") end
resolve_parameters(node_id, params_to_resolve)
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 179 def resolve_parameters(node_id, params_to_resolve) params_to_resolve.each_with_object({}) do |(key, value), result| result[key] = if key == :sources value.map { |_value| resolve_parameter(node_id, _value) } elsif value.is_a?(Symbol) # Allow resolution of intermediates resolve_parameter(node_id, value) elsif value.nil? resolve_parameter(node_id, key) else value end end end
run()
click to toggle source
# File lib/arc-furnace/pipeline.rb, line 154 def run while (row = sink_source.row) sink_node.row(row) end intermediates_map.each { |_, instance| instance.finalize } sink_node.finalize end