class Blobsterix::Transformations::TransformationManager

a TransormationManager cares about:

Public Class Methods

new() click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 6
def initialize()
  auto_load
end

Public Instance Methods

add(trafo) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 10
def add(trafo)
  transformation = (trafo.is_a?(String) ? ::Blobsterix::Transformations::Impl::const_get(trafo).new : trafo)
  transformations << transformation if transformations.select{|trafo|trafo.name === transformation.name}.empty?
  self
end
run(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 16
def run(blob_access)

  blob_access = wait_for_transformation(blob_access) if transformation_in_progress?(blob_access)

  return blob_access.get if blob_access.get.valid

  cue_transformation(blob_access)

  blob_access = run_transformation(blob_access)

  blob_access.get.valid ? blob_access.get : Blobsterix::Storage::BlobMetaData.new
end

Private Instance Methods

auto_load() click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 59
def auto_load()
  Blobsterix::Transformations::Impl.constants.each{|c|
    add(c.to_s)
  }
end
cue_transformation(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 44
def cue_transformation(blob_access)
  running_transformations[blob_access.identifier] = [Fiber.current]
  blob_access
end
findTransformation(name, input_type) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 109
def findTransformation(name, input_type)
  trafos = transformations.select{|trafo| trafo.name === name and trafo.input_type.is?(input_type)}
  trafos.empty? ? nil : trafos[0]
end
findTransformation_out(input_type, output_type) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 114
def findTransformation_out(input_type, output_type)
  trafos = transformations.select{|trafo|
    trafo.input_type.is?(input_type) and trafo.output_type.equal?(output_type)
  }
  trafos.empty? ? nil : trafos[0]
end
finish_connection(result, blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 101
def finish_connection(result, blob_access)
  logger.debug "Transformation: done #{blob_access} finish connections"
  running_transformations[blob_access.identifier].each{|fiber|
    fiber.resume(result)
  } if running_transformations[blob_access.identifier] # check if there are pending fibers or if the connection was already closed
  uncue_transformation(blob_access)
end
run_transformation(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 65
def run_transformation(blob_access)
  logger.debug "Transformation: build #{blob_access}"

  metaData = blob_access.source || Blobsterix::BlobAccess.new(:bucket => blob_access.bucket,:id => blob_access.id).get

  return uncue_transformation(blob_access) unless metaData.valid

  chain = TransformationChain.new(blob_access, metaData, logger)

  blob_access.trafo.each {|trafo_pair|
    chain.add(findTransformation(trafo_pair[0], chain.last_type), trafo_pair[1])
  }

  chain.finish(blob_access.accept_type, findTransformation_out(chain.last_type, blob_access.accept_type))

  if chain.target_blob_access.get.valid
    uncue_transformation(blob_access)
    chain.target_blob_access
  else
    logger.debug "Transformation: run #{blob_access}"
    EM.defer(Proc.new {
      begin
        chain.do()
      rescue Exception => e
        e
      end
    }, Proc.new {|result|
      finish_connection(result, blob_access)
    })

    result = Fiber.yield
    raise result if result.is_a? Exception
    result
  end
end
running_transformations() click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 30
def running_transformations
  @running_transformations ||= {}
end
transformation_in_progress?(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 54
def transformation_in_progress?(blob_access)
  running = running_transformations.has_key?(blob_access.identifier)
  running
end
transformations() click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 34
def transformations
  @transformations ||= []
end
uncue_transformation(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 49
def uncue_transformation(blob_access)
  running_transformations.delete(blob_access.identifier)
  blob_access
end
wait_for_transformation(blob_access) click to toggle source
# File lib/blobsterix/transformation/transformation_manager.rb, line 38
def wait_for_transformation(blob_access)
  running_transformations[blob_access.identifier] << Fiber.current
  logger.debug "Transformation: wait for it to finish #{blob_access}"
  Fiber.yield
end