class ArcFurnace::Pipeline
Constants
- ALLOWABLE_PARAM_TYPES
Public Class Methods
Define a node that filters rows. By default you get a BlockFilter
(and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node
# File lib/arc-furnace/pipeline.rb, line 90 def self.filter(node_id, type: BlockFilter, params: {}, &block) if block_given? && type <= BlockFilter params[:block] = block end raise "Filter #{type} is not a Filter!" unless type <= Filter define_intermediate(node_id, type: type, params: params) end
Define a hash node, processing all rows from it's source and caching them in-memory.
# File lib/arc-furnace/pipeline.rb, line 33 def self.hash_node(node_id, type: ArcFurnace::Hash, params:) define_intermediate(node_id, type: type, params: params) end
Ensure that subclasses don't overwrite the parent's transform node definitions
# File lib/arc-furnace/pipeline.rb, line 13 def self.inherited(subclass) subclass.intermediates_map = intermediates_map.dup end
Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key
# File lib/arc-furnace/pipeline.rb, line 46 def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) define_intermediate(node_id, type: type, params: params) end
Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.
# File lib/arc-furnace/pipeline.rb, line 114 def self.instance(params = {}) @params = params PipelineInstance.new(self, params) end
Define a merge node where rows from multiple source nodes are merged into a single row
# File lib/arc-furnace/pipeline.rb, line 70 def self.merge(node_id, type: ArcFurnace::Merge, params:) define_intermediate(node_id, type: type, params: params) end
Define a node that observes rows. By default you get a BlockObserver
(and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block is ignored; all rows are forwarded to the next node in the line
# File lib/arc-furnace/pipeline.rb, line 102 def self.observer(node_id, type: BlockObserver, params: {}, &block) if block_given? && type <= BlockObserver params[:block] = block end raise "Observer #{type} is not an Observer!" unless type <= Observer define_intermediate(node_id, type: type, params: params) end
Define an outer join node where rows from the source are kept even if an associated entity is not found in the hash for the join key
# File lib/arc-furnace/pipeline.rb, line 52 def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) define_intermediate(node_id, type: type, params: params) end
Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.
# File lib/arc-furnace/pipeline.rb, line 20 def self.sink(type:, source:, params:) if sink_node raise 'Sink already defined!' end @sink_node = -> do type.new(resolve_parameters(:sink, params)) end @sink_source = source end
A source that has row semantics, delivering a hash per row (or per entity) for the source.
# File lib/arc-furnace/pipeline.rb, line 39 def self.source(node_id, type:, params:) raise "Source #{type} is not a Source!" unless type <= Source define_intermediate(node_id, type: type, params: params) end
Define a node that transforms rows. By default you get a BlockTransform
(and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.
# File lib/arc-furnace/pipeline.rb, line 60 def self.transform(node_id, type: BlockTransform, params: {}, &block) if block_given? && type <= BlockTransform params[:block] = block end raise "Transform #{type} is not a Transform!" unless type <= Transform define_intermediate(node_id, type: type, params: params) end
Define a node that unfolds rows. By default you get a BlockUnfold
(and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.
# File lib/arc-furnace/pipeline.rb, line 78 def self.unfold(node_id, type: BlockUnfold, params: {}, &block) if block_given? && type <= BlockUnfold params[:block] = block end raise "Unfold #{type} is not an Unfold!" unless type <= Unfold define_intermediate(node_id, type: type, params: params) end
Private Class Methods
# File lib/arc-furnace/pipeline.rb, line 123 def self.define_intermediate(node_id, type:, params:) intermediates_map[node_id] = -> do resolved_params = resolve_parameters(node_id, params) key_parameters = type.instance_method(:initialize).parameters do |param| ALLOWABLE_PARAM_TYPES.include?(param.first) end.map(&:second) # Allow params to be passed that are not in the initializer instance = create_instance_with_error_handling(type, resolved_params, key_parameters) instance.params = resolved_params instance end end