class Persist::Sharder
Constants
- MAX_CHAR
Attributes
Public Class Methods
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 16 def initialize(persistence_path, write = false, db_type=nil, options = {}, &block) @shard_function = block @options = options @persistence_path = Path.setup(persistence_path) @mutex = Mutex.new @writable = write @db_type = db_type if write @databases = {} end end
Public Instance Methods
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 29 def <<(key,value) self[key] = value end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 214 def [](key, clean=false) database = database(key) return nil if database.nil? database.send(:[], key) end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 210 def []=(key, value, clean = false) database(key).send(:[]=, key, value) end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 93 def close @closed = true super end
Calls superclass method
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 146 def collect res = [] each do |key, value| res << if block_given? yield key, value else [key, value] end end res end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 53 def database(key) shard = key =~ /__tsv_/ ? "metadata" : shard_function.call(key) if databases.include? shard databases[shard] else if shard == 'metadata' database ||= begin path = File.join(persistence_path, 'shard-' << shard.to_s) (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, "HDB", @options) : nil end else database ||= begin path = File.join(persistence_path, 'shard-' << shard.to_s) (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, db_type, @options) : nil end end if database databases[shard] = database else Log.warn "Database #{ path } missing" if nil end end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 134 def each databases.values.each do |database| database.each do |k,v| yield k, v end end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 84 def get_prefix(key) keys = prefix(key) select(:key => keys) end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 142 def include?(key) self[key] != nil end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 206 def keys databases.values.collect{|d| d.keys }.flatten - TSV::ENTRY_KEYS.to_a end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 200 def merge!(hash) hash.each do |key,values| self[key] = values end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 33 def persistence_path=(path) @persistence_path = path databases.values.each{|db| db.persistence_path = File.join(path, File.basename(db.persistence_path))} end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 80 def prefix(key) range(key, 1, key + MAX_CHAR, 1) end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 127 def range(*args) databases.values.inject([]) do |acc,database| acc.concat database.range(*args) if TokyoCabinet::BDB === database acc end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 98 def read(force = false) raise "SIOT" return if not write? and not closed and not force self.close databases.each{|d| d.read } @writable = false @closed = false self end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 188 def read_and_close @mutex.synchronize do read if @closed or not read? res = begin yield ensure close end res end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 237 def size databases.inject(0){|acc,i| shard, db = i; acc += db.size } end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 108 def write(force = true) return if write? and not closed and not force self.close databases.each{|d| d.write } @writable = true @closed = false self end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 173 def write_and_close lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield ensure close end res end end end
Source
# File lib/rbbt/persist/tsv/sharder.rb, line 158 def write_and_read lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield ensure read end res end end end