class BitBroker::ManagerImpl

Public Class Methods

new(opts) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 3
def initialize(opts)
  # validate user created arguments
  validate(opts)

  ### prepare brokers
  @config = {
    :mqconfig => opts[:mqconfig],
    :label => opts[:name],
    :dirpath => form_dirpath(opts[:path]),
  }

  @metadata = Metadata.new(@config[:dirpath])

  @publisher = Publisher.new(@config)

  @deficients = @suggestions = []
  @semaphore = Mutex.new

  # internal variable in this class to know who modified/crated file
  @file_activities = []
end

Public Instance Methods

do_start_collector() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 87
def do_start_collector
  Thread.new do
    loop do
      deficient = @deficients.first
      if deficient != nil
        candidates = @suggestions.select { |x| x['path'] == deficient['path'] }
        if candidates.size > 0
          candidate = candidates[rand(candidates.size)]

          @metadata.request(@publisher, [candidate], candidate['from'])

          @semaphore.synchronize do
            @suggestions = @suggestions.reject {|x| x['path'] == deficient['path']}
            @deficients.delete(deficient)
          end
        end
      end
      Thread.pass
    end
  end
end
do_start_data_receiver() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 137
def do_start_data_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_data do |binary, from|
      path = MessagePack.unpack(binary)['path']

      Log.debug("[ManagerImpl] (data_receiver) path: #{path}")

      @file_activities.push(FileActivity.create(path))

      Solvant.load_binary(@config[:dirpath], binary)
    end
  end
end
do_start_metadata_receiver() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 109
def do_start_metadata_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_metadata do |msg, from|
      case msg['type']
      when Metadata::TYPE_ADVERTISE then
        receive_advertise(msg['data'], from)
      when Metadata::TYPE_REQUEST_ALL then
        receive_request_all(msg['data'], from)
      end
    end
  end
end
do_start_observer() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 32
def do_start_observer
  def handle_add(path)
    Log.debug("[ManagerImpl] (handle_add) path:#{path}")

    rpath = @metadata.get_rpath(path)
    if obj = @file_activities.find {|x| x.path == rpath}
      @file_activities.delete(obj)
    else
      # create metadata info
      @metadata.create(rpath)

      # upload target file
      Solvant.new(@metadata.dir, rpath).upload(@publisher)
    end
  end

  def handle_mod(path)
    Log.debug("[ManagerImpl] (handle_mod) path:#{path}")

    rpath = @metadata.get_rpath(path)
    if obj = @file_activities.find {|x| x.path == rpath}
      @file_activities.delete(obj)
    else
      # upload target file
      Solvant.new(@metadata.dir, rpath).upload(@publisher)

      # update fileinfo
      @metadata.get_with_path(rpath).update

      @metadata.advertise(@publisher)
    end
  end

  def handle_rem(path)
    rpath = @metadata.get_rpath(path)

    #@metadata.remove_with_path(rpath)
    file = @metadata.get_with_path(rpath)
    if file != nil
      Log.debug("[ManagerImpl] (handle_rem) path:#{path}")

      file.remove
      @metadata.advertise(@publisher)
    end
  end

  Thread.new do
    Observer.new(@config[:dirpath]) do |mod, add, rem|
      mod.each {|x| handle_mod(x)}
      add.each {|x| handle_add(x)}
      rem.each {|x| handle_rem(x)}
    end
  end
end
do_start_p_data_receiver() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 151
def do_start_p_data_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_p_data do |binary, from|
      path = MessagePack.unpack(binary)['path']

      Log.debug("[ManagerImpl] (p_data_receiver) path: #{path}")

      @file_activities.push(FileActivity.create(path))

      Solvant.load_binary(@config[:dirpath], binary)
    end
  end
end
do_start_p_metadata_receiver() click to toggle source
# File lib/bitbroker/manager_impl.rb, line 123
def do_start_p_metadata_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_p_metadata do |msg, from|
      case msg['type']
      when Metadata::TYPE_SUGGESTION then
        receive_suggestion(msg['data'], from)
      when Metadata::TYPE_REQUEST then
        receive_request(msg['data'], from)
      end
    end
  end
end
form_dirpath(path) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 25
def form_dirpath path
    path[-1] == '/' ? form_dirpath(path.chop) : path
end
handle_add(path) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 33
def handle_add(path)
  Log.debug("[ManagerImpl] (handle_add) path:#{path}")

  rpath = @metadata.get_rpath(path)
  if obj = @file_activities.find {|x| x.path == rpath}
    @file_activities.delete(obj)
  else
    # create metadata info
    @metadata.create(rpath)

    # upload target file
    Solvant.new(@metadata.dir, rpath).upload(@publisher)
  end
end
handle_mod(path) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 48
def handle_mod(path)
  Log.debug("[ManagerImpl] (handle_mod) path:#{path}")

  rpath = @metadata.get_rpath(path)
  if obj = @file_activities.find {|x| x.path == rpath}
    @file_activities.delete(obj)
  else
    # upload target file
    Solvant.new(@metadata.dir, rpath).upload(@publisher)

    # update fileinfo
    @metadata.get_with_path(rpath).update

    @metadata.advertise(@publisher)
  end
end
handle_rem(path) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 65
def handle_rem(path)
  rpath = @metadata.get_rpath(path)

  #@metadata.remove_with_path(rpath)
  file = @metadata.get_with_path(rpath)
  if file != nil
    Log.debug("[ManagerImpl] (handle_rem) path:#{path}")

    file.remove
    @metadata.advertise(@publisher)
  end
end
has_file?(remote) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 223
def has_file?(remote)
  @metadata.get_with_path(remote['path']) != nil
end
receive_advertise(data, from) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 166
def receive_advertise(data, from)
  def updated?(remote)
    case f = @metadata.get_with_path(remote['path'])
    when nil # this means target file doesn't exist in local.
      true
    else
      f.size != remote['size'] and
      not f.removed?
    end
  end

  def removed?(remote)
    case f = @metadata.get_with_path(remote['path'])
    when nil
      false
    else
      remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0
    end
  end
  Log.debug("[ManagerImpl] (receive_advertise) <#{from}> data:#{data}")

  deficients = []
  data.each do |remote|
    if removed? remote
      Log.debug("[ManagerImpl] (receive_advertise) remove: #{remote}")

      # set file_activities
      @file_activities.push(FileActivity.remove(remote['path']))

      # remove FileInfo object which metadata has
      @metadata.remove_with_path(remote['path'])

      # remove actual file in local FS
      Solvant.new(@config[:dirpath], remote['path']).remove
    else updated? remote
      deficients.push(remote)

      fpath = "#{@config[:dirpath]}/#{remote['path']}"
      if FileTest.exist? fpath
        Log.debug("[ManagerImpl] trancated(#{fpath}, #{remote['size']})")

        # truncate files when target file is cut down
        File.truncate(fpath, remote['size'])
      end
    end
  end

  # request all deficients files
  @metadata.request_all(@publisher, deficients)

  # record deficient files to get it from remote node
  @semaphore.synchronize do
    @deficients += deficients
  end
end
receive_request(data, from) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 244
def receive_request(data, from)
  Log.debug("[ManagerImpl] (receive_request) <#{from}> data:#{data}")

  data.each do |remote|
    f = @metadata.get_with_path(remote['path'])

    Solvant.new(@config[:dirpath], f.path).upload_to(@publisher, from)
  end
end
receive_request_all(data, from) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 222
def receive_request_all(data, from)
  def has_file?(remote)
    @metadata.get_with_path(remote['path']) != nil
  end
  Log.debug("[ManagerImpl] (receive_request_all) <#{from}> data:#{data}")

  files = data.map {|f| @metadata.get_with_path(f['path'])}.select{|x| x != nil}
  if files != []
    Log.debug("[ManagerImpl] (receive_request_all) files:#{files}")
    @metadata.suggestion(@publisher, files.map{|x| x.to_h}, from)
  end
end
receive_suggestion(data, from) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 235
def receive_suggestion(data, from)
  Log.debug("[ManagerImpl] (receive_suggestion) <#{from}> data:#{data}")

  data.each {|x| x['from'] = from}
  @semaphore.synchronize do
    @suggestions += data
  end
end
removed?(remote) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 177
def removed?(remote)
  case f = @metadata.get_with_path(remote['path'])
  when nil
    false
  else
    remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0
  end
end
updated?(remote) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 167
def updated?(remote)
  case f = @metadata.get_with_path(remote['path'])
  when nil # this means target file doesn't exist in local.
    true
  else
    f.size != remote['size'] and
    not f.removed?
  end
end
validate(opts) click to toggle source
# File lib/bitbroker/manager_impl.rb, line 28
def validate(opts)
  raise InvalidArgument("Specified path is not directory") unless File.directory?(opts[:path])
end