class BitBroker::ManagerImpl
Public Class Methods
new(opts)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 3 def initialize(opts) # validate user created arguments validate(opts) ### prepare brokers @config = { :mqconfig => opts[:mqconfig], :label => opts[:name], :dirpath => form_dirpath(opts[:path]), } @metadata = Metadata.new(@config[:dirpath]) @publisher = Publisher.new(@config) @deficients = @suggestions = [] @semaphore = Mutex.new # internal variable in this class to know who modified/crated file @file_activities = [] end
Public Instance Methods
do_start_collector()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 87 def do_start_collector Thread.new do loop do deficient = @deficients.first if deficient != nil candidates = @suggestions.select { |x| x['path'] == deficient['path'] } if candidates.size > 0 candidate = candidates[rand(candidates.size)] @metadata.request(@publisher, [candidate], candidate['from']) @semaphore.synchronize do @suggestions = @suggestions.reject {|x| x['path'] == deficient['path']} @deficients.delete(deficient) end end end Thread.pass end end end
do_start_data_receiver()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 137 def do_start_data_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_data do |binary, from| path = MessagePack.unpack(binary)['path'] Log.debug("[ManagerImpl] (data_receiver) path: #{path}") @file_activities.push(FileActivity.create(path)) Solvant.load_binary(@config[:dirpath], binary) end end end
do_start_metadata_receiver()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 109 def do_start_metadata_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_metadata do |msg, from| case msg['type'] when Metadata::TYPE_ADVERTISE then receive_advertise(msg['data'], from) when Metadata::TYPE_REQUEST_ALL then receive_request_all(msg['data'], from) end end end end
do_start_observer()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 32 def do_start_observer def handle_add(path) Log.debug("[ManagerImpl] (handle_add) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # create metadata info @metadata.create(rpath) # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) end end def handle_mod(path) Log.debug("[ManagerImpl] (handle_mod) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) # update fileinfo @metadata.get_with_path(rpath).update @metadata.advertise(@publisher) end end def handle_rem(path) rpath = @metadata.get_rpath(path) #@metadata.remove_with_path(rpath) file = @metadata.get_with_path(rpath) if file != nil Log.debug("[ManagerImpl] (handle_rem) path:#{path}") file.remove @metadata.advertise(@publisher) end end Thread.new do Observer.new(@config[:dirpath]) do |mod, add, rem| mod.each {|x| handle_mod(x)} add.each {|x| handle_add(x)} rem.each {|x| handle_rem(x)} end end end
do_start_p_data_receiver()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 151 def do_start_p_data_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_p_data do |binary, from| path = MessagePack.unpack(binary)['path'] Log.debug("[ManagerImpl] (p_data_receiver) path: #{path}") @file_activities.push(FileActivity.create(path)) Solvant.load_binary(@config[:dirpath], binary) end end end
do_start_p_metadata_receiver()
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 123 def do_start_p_metadata_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_p_metadata do |msg, from| case msg['type'] when Metadata::TYPE_SUGGESTION then receive_suggestion(msg['data'], from) when Metadata::TYPE_REQUEST then receive_request(msg['data'], from) end end end end
form_dirpath(path)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 25 def form_dirpath path path[-1] == '/' ? form_dirpath(path.chop) : path end
handle_add(path)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 33 def handle_add(path) Log.debug("[ManagerImpl] (handle_add) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # create metadata info @metadata.create(rpath) # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) end end
handle_mod(path)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 48 def handle_mod(path) Log.debug("[ManagerImpl] (handle_mod) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) # update fileinfo @metadata.get_with_path(rpath).update @metadata.advertise(@publisher) end end
handle_rem(path)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 65 def handle_rem(path) rpath = @metadata.get_rpath(path) #@metadata.remove_with_path(rpath) file = @metadata.get_with_path(rpath) if file != nil Log.debug("[ManagerImpl] (handle_rem) path:#{path}") file.remove @metadata.advertise(@publisher) end end
has_file?(remote)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 223 def has_file?(remote) @metadata.get_with_path(remote['path']) != nil end
receive_advertise(data, from)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 166 def receive_advertise(data, from) def updated?(remote) case f = @metadata.get_with_path(remote['path']) when nil # this means target file doesn't exist in local. true else f.size != remote['size'] and not f.removed? end end def removed?(remote) case f = @metadata.get_with_path(remote['path']) when nil false else remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0 end end Log.debug("[ManagerImpl] (receive_advertise) <#{from}> data:#{data}") deficients = [] data.each do |remote| if removed? remote Log.debug("[ManagerImpl] (receive_advertise) remove: #{remote}") # set file_activities @file_activities.push(FileActivity.remove(remote['path'])) # remove FileInfo object which metadata has @metadata.remove_with_path(remote['path']) # remove actual file in local FS Solvant.new(@config[:dirpath], remote['path']).remove else updated? remote deficients.push(remote) fpath = "#{@config[:dirpath]}/#{remote['path']}" if FileTest.exist? fpath Log.debug("[ManagerImpl] trancated(#{fpath}, #{remote['size']})") # truncate files when target file is cut down File.truncate(fpath, remote['size']) end end end # request all deficients files @metadata.request_all(@publisher, deficients) # record deficient files to get it from remote node @semaphore.synchronize do @deficients += deficients end end
receive_request(data, from)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 244 def receive_request(data, from) Log.debug("[ManagerImpl] (receive_request) <#{from}> data:#{data}") data.each do |remote| f = @metadata.get_with_path(remote['path']) Solvant.new(@config[:dirpath], f.path).upload_to(@publisher, from) end end
receive_request_all(data, from)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 222 def receive_request_all(data, from) def has_file?(remote) @metadata.get_with_path(remote['path']) != nil end Log.debug("[ManagerImpl] (receive_request_all) <#{from}> data:#{data}") files = data.map {|f| @metadata.get_with_path(f['path'])}.select{|x| x != nil} if files != [] Log.debug("[ManagerImpl] (receive_request_all) files:#{files}") @metadata.suggestion(@publisher, files.map{|x| x.to_h}, from) end end
receive_suggestion(data, from)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 235 def receive_suggestion(data, from) Log.debug("[ManagerImpl] (receive_suggestion) <#{from}> data:#{data}") data.each {|x| x['from'] = from} @semaphore.synchronize do @suggestions += data end end
removed?(remote)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 177 def removed?(remote) case f = @metadata.get_with_path(remote['path']) when nil false else remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0 end end
updated?(remote)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 167 def updated?(remote) case f = @metadata.get_with_path(remote['path']) when nil # this means target file doesn't exist in local. true else f.size != remote['size'] and not f.removed? end end
validate(opts)
click to toggle source
# File lib/bitbroker/manager_impl.rb, line 28 def validate(opts) raise InvalidArgument("Specified path is not directory") unless File.directory?(opts[:path]) end