class Blobsterix::Transformations::TransformationManager
a TransormationManager cares about:
Public Class Methods
new()
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 6 def initialize() auto_load end
Public Instance Methods
add(trafo)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 10 def add(trafo) transformation = (trafo.is_a?(String) ? ::Blobsterix::Transformations::Impl::const_get(trafo).new : trafo) transformations << transformation if transformations.select{|trafo|trafo.name === transformation.name}.empty? self end
run(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 16 def run(blob_access) blob_access = wait_for_transformation(blob_access) if transformation_in_progress?(blob_access) return blob_access.get if blob_access.get.valid cue_transformation(blob_access) blob_access = run_transformation(blob_access) blob_access.get.valid ? blob_access.get : Blobsterix::Storage::BlobMetaData.new end
Private Instance Methods
auto_load()
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 59 def auto_load() Blobsterix::Transformations::Impl.constants.each{|c| add(c.to_s) } end
cue_transformation(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 44 def cue_transformation(blob_access) running_transformations[blob_access.identifier] = [Fiber.current] blob_access end
findTransformation(name, input_type)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 109 def findTransformation(name, input_type) trafos = transformations.select{|trafo| trafo.name === name and trafo.input_type.is?(input_type)} trafos.empty? ? nil : trafos[0] end
findTransformation_out(input_type, output_type)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 114 def findTransformation_out(input_type, output_type) trafos = transformations.select{|trafo| trafo.input_type.is?(input_type) and trafo.output_type.equal?(output_type) } trafos.empty? ? nil : trafos[0] end
finish_connection(result, blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 101 def finish_connection(result, blob_access) logger.debug "Transformation: done #{blob_access} finish connections" running_transformations[blob_access.identifier].each{|fiber| fiber.resume(result) } if running_transformations[blob_access.identifier] # check if there are pending fibers or if the connection was already closed uncue_transformation(blob_access) end
run_transformation(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 65 def run_transformation(blob_access) logger.debug "Transformation: build #{blob_access}" metaData = blob_access.source || Blobsterix::BlobAccess.new(:bucket => blob_access.bucket,:id => blob_access.id).get return uncue_transformation(blob_access) unless metaData.valid chain = TransformationChain.new(blob_access, metaData, logger) blob_access.trafo.each {|trafo_pair| chain.add(findTransformation(trafo_pair[0], chain.last_type), trafo_pair[1]) } chain.finish(blob_access.accept_type, findTransformation_out(chain.last_type, blob_access.accept_type)) if chain.target_blob_access.get.valid uncue_transformation(blob_access) chain.target_blob_access else logger.debug "Transformation: run #{blob_access}" EM.defer(Proc.new { begin chain.do() rescue Exception => e e end }, Proc.new {|result| finish_connection(result, blob_access) }) result = Fiber.yield raise result if result.is_a? Exception result end end
running_transformations()
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 30 def running_transformations @running_transformations ||= {} end
transformation_in_progress?(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 54 def transformation_in_progress?(blob_access) running = running_transformations.has_key?(blob_access.identifier) running end
transformations()
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 34 def transformations @transformations ||= [] end
uncue_transformation(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 49 def uncue_transformation(blob_access) running_transformations.delete(blob_access.identifier) blob_access end
wait_for_transformation(blob_access)
click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 38 def wait_for_transformation(blob_access) running_transformations[blob_access.identifier] << Fiber.current logger.debug "Transformation: wait for it to finish #{blob_access}" Fiber.yield end