class Flor::Storage
Constants
- CRECON_STATUSES
- FP_TYPES
- MESSAGE_COLUMNS
- POINTER_COLUMNS
- POINTS_TO_ARCHIVE
- RESCON_STATUSES
Attributes
might be useful for some implementations
Public Class Methods
Source
# File lib/flor/unit/storage.rb, line 950 def from_blob(content) content ? JSON.parse(Zlib::Inflate.inflate(content)) : nil end
Source
# File lib/flor/unit/storage.rb, line 26 def initialize(unit) @unit = unit @models = {} @archive = @unit.conf['sto_archive'] @mutex = @unit.conf['sto_sync'] ? Mutex.new : nil @callbacks = {} connect end
Source
# File lib/flor/unit/storage.rb, line 944 def to_blob(h) h ? Sequel.blob(Zlib::Deflate.deflate(JSON.dump(h))) : nil #rescue => e; pp h; raise e end
Public Instance Methods
Source
# File lib/flor/unit/storage.rb, line 297 def any_message? synchronize do @db[:flor_messages].where(status: 'created').count > 0 end rescue => err @unit.logger.warn( "#{self.class}#any_message?()", err, '(returning false)') false end
Source
# File lib/flor/unit/storage.rb, line 332 def consume(messages) if @archive consume_and_archive(messages) else consume_and_discard(messages) end rescue => err Thread.current[:sto_errored_items] = messages raise err end
Source
# File lib/flor/unit/storage.rb, line 45 def db_version(opts={}) table, column = migration_table_and_column(opts) (@db[table].first rescue {})[column] end
Source
# File lib/flor/unit/storage.rb, line 129 def delete_tables @db.tables.each { |t| @db[t].delete \ if t.to_s.match(/^flor_/) && @db[t].columns.size > 2 } end
Delete tables in the storage database that begin with “flor_” and have more than 2 columns (the Sequel schema_info table has 1 column as of this writing)
Source
# File lib/flor/unit/storage.rb, line 542 def fetch_next_time t = synchronize do @db[:flor_timers] .select(:ntime) .order(:ntime) .first(status: 'active') end t ? t[:ntime].split('.').first : nil rescue => err @unit.logger.warn( "#{self.class}#fetch_next_time()", err, '(returning nil)') nil end
Source
# File lib/flor/unit/storage.rb, line 312 def fetch_traps(exid) synchronize do traps .where(status: 'active') .where(domain: split_domain(exid)) .all end rescue => err @unit.logger.warn( "#{self.class}#fetch_traps()", err, '(returning [])') [] end
Source
# File lib/flor/unit/storage.rb, line 136 def load_execution(exid) synchronize do e = @db[:flor_executions] .select(:id, :content) .first(exid: exid) # status active or terminated doesn't matter return { 'exid' => exid, 'nodes' => {}, 'counters' => {}, 'start' => Flor.tstamp, 'size' => 0 } unless e ex = from_blob(e[:content]) fail("couldn't parse execution (db id #{e[:id].to_i})") unless ex ex['id'] = e[:id].to_i ex['size'] = e[:content].size ex end end
Source
# File lib/flor/unit/storage.rb, line 232 def load_messages(exe_count) exe_count += 2 # load two more, could prove useful if they vanish like "petits pains" synchronize do _exids_being_processed = @db[:flor_messages] .select(:exid) .exclude(status: CRECON_STATUSES) _exids = @db[:flor_messages] .select(:exid) .exclude(exid: _exids_being_processed) .exclude(status: RESCON_STATUSES) .limit(exe_count) @db[:flor_messages] .where(exid: _exids, status: 'created') .inject({}) { |h, m| (h[m[:exid]] ||= []) << m; h } end rescue => err @unit.logger.warn( "#{self.class}#load_messages()", err, '(returning {})') {} end
Source
# File lib/flor/unit/storage.rb, line 90 def migrate(to=nil, from=nil, opts=nil) opts = [ to, from, opts ].find { |e| e.is_a?(Hash) } || {} opts[:target] ||= to if to.is_a?(Integer) opts[:current] ||= from if from.is_a?(Integer) opts[:table], opts[:column] = migration_table_and_column(opts) # # defaults for the migration version table: # { table: :schema_info, # column: :version } skip = opts[:sparse_migrations] || @unit.conf['db_sparse_migrations'] || @unit.conf['sto_sparse_migrations'] if skip && ! opts.has_key?(:allow_missing_migration_files) opts[:allow_missing_migration_files] = true end dir = opts[:migrations] || opts[:migration_dir] || @unit.conf['db_migrations'] || @unit.conf['db_migration_dir'] || @unit.conf['sto_migrations'] || @unit.conf['sto_migration_dir'] || Flor.migration_dir synchronize do Sequel::Migrator.run(@db, dir, opts) end end
Source
# File lib/flor/unit/storage.rb, line 52 def migration_version Dir[File.join(File.dirname(__FILE__), '../migrations/*.rb')] .inject([]) { |a, fn| m = File.basename(fn).match(/^(\d{4})_/) a << m[1].to_i if m a } .max end
Source
# File lib/flor/unit/storage.rb, line 562 def on(key, actions=[], &block) as = case actions when :any, 'any' then [] when Array then actions when Symbol then [ actions ] when String then actions.split(/\s*[;,]\s*/) else [] end .collect(&:to_sym) (@callbacks[key] ||= []) << [ as, block ] end
Source
# File lib/flor/unit/storage.rb, line 160 def put_execution(exe) status = exe['nodes'].find { |_, n| n['status'].last['status'] != 'ended' } ? 'active' : 'terminated' id = exe['id'] if id exe['end'] ||= Flor.tstamp \ if status == 'terminated' exe['duration'] = Time.parse(exe['end']) - Time.parse(exe['start']) \ if exe['end'] end data = to_blob(exe) exe['size'] = data.size u = @unit.identifier transync do now = Flor.tstamp if id @db[:flor_executions] .where(id: id.to_i) .update( content: data, status: status, mtime: now, munit: u) callback(:executions, :update, id) else exe['id'] = @db[:flor_executions] .insert( domain: Flor.domain(exe['exid']), exid: exe['exid'], content: data, status: status, ctime: now, mtime: now, cunit: u, munit: u) .to_i callback(:executions, :insert, exe['id']) end remove_nodes(exe, status, now) update_pointers(exe, status, now) end exe # return the execution hash rescue => err Thread.current[:sto_errored_items] = [ exe ] raise err end
Source
# File lib/flor/unit/storage.rb, line 392 def put_message(m) put_messages([ m ]) end
Source
# File lib/flor/unit/storage.rb, line 346 def put_messages(ms, syn=true) return if ms.empty? n = Flor.tstamp u = @unit.identifier id = synchronize(syn) do stored, unstored = ms.partition { |m| m['mid'] } # # de-reserve any previously stored message, might happen # for "terminated" messages that got queued back to let # other messages get processed @db[:flor_messages] .where(id: stored.collect { |m| m['mid'] }) .update(status: 'created', mtime: n, munit: u) \ if stored.any? # # store new messages @db[:flor_messages] .import( MESSAGE_COLUMNS, unstored.map { |m| [ Flor.domain(m['exid']), m['exid'], m['point'], to_blob(m), 'created', n, n, u, u ] }) \ if unstored.any? @db[:flor_messages].max(:id) end @unit.wake_up id rescue => err Thread.current[:sto_errored_items] = ms raise err end
Source
# File lib/flor/unit/storage.rb, line 418 def put_timer(message) type, string = determine_type_and_schedule(message) next_time = compute_next_time(type, string) now = Flor.tstamp u = @unit.identifier id = synchronize do @db[:flor_timers] .insert( domain: Flor.domain(message['exid']), exid: message['exid'], nid: message['nid'], onid: message['onid'] || message['nid'], bnid: message['nid'], type: type, schedule: string, ntime: next_time, content: to_blob(message), count: 0, status: 'active', ctime: now, mtime: now, cunit: u, munit: u) end callback(:timers, :insert, id) @unit.wake_up rescue => err Thread.current[:sto_errored_items] = [ message ] raise err end
Source
# File lib/flor/unit/storage.rb, line 475 def put_trap(node, tra) exid = node['exid'] dom = Flor.domain(exid) now = Flor.tstamp u = @unit.identifier id = synchronize do #points = att_a('point', 'points', nil) ### TODO #tags = att_a('tag', 'tags', nil) # #heats = att_a('heat', 'heats', nil) # #heaps = att_a('heap', 'heaps', nil) # #names = att_a('name', 'names', nil) # # #opts[:heap] = theaps.split(',') if theaps #opts[:heat] = theats.split(',') if theats # @db[:flor_traps] .insert( domain: dom, exid: exid, nid: tra['nid'], onid: tra['onid'] || tra['nid'], bnid: tra['bnid'], trange: tra['range'], tpoints: commaify(tra['points']), ttags: commaify(tra['tags']), theats: commaify(tra['heats']), theaps: commaify(tra['heaps']), content: to_blob(tra), status: 'active', ctime: now, mtime: now, cunit: u, munit: u) end callback(:traps, :insert, id) traps[id] rescue => err Thread.current[:sto_errored_items] = [ node, tra ] raise err end
Source
# File lib/flor/unit/storage.rb, line 63 def ready? db_version == migration_version end
Source
# File lib/flor/unit/storage.rb, line 262 def reserve_all_messages(messages) now = Flor.tstamp count = 0 transync do messages.each do |m| c = @db[:flor_messages] .where( id: m[:id].to_i, status: 'created', mtime: m[:mtime], munit: m[:munit]) .update( status: 'reserved', mtime: now, munit: @unit.identifier) raise Sequel::Rollback if c != 1 count += 1 end end count == messages.size # true means success: all the messages could be reserved, # executor is clear to work on the execution rescue => err @unit.logger.warn( "#{self.class}#reserve_all_messages()", err, '(returning false)') false # failure end
Source
# File lib/flor/unit/storage.rb, line 39 def shutdown @db.disconnect #p [ :disconnected, @db.object_id ] end
Source
# File lib/flor/unit/storage.rb, line 68 def synchronize(on=true, &block) Thread.current[:sto_errored_items] = nil if on if @mutex && on @mutex.synchronize(&block) else block.call end end
Source
# File lib/flor/unit/storage.rb, line 524 def trace(exid, nid, tracer, text) text = text.is_a?(String) ? text : JSON.dump(text) synchronize do @db[:flor_traces] .insert( domain: Flor.domain(exid), exid: exid, nid: nid, tracer: tracer, text: text, ctime: Flor.tstamp, cunit: @unit.identifier) end end
Source
# File lib/flor/unit/storage.rb, line 79 def transync(on=true, &block) Thread.current[:sto_errored_items] = nil if on if @mutex && on @mutex.synchronize { @db.transaction(&block) } else block.call end end
Source
# File lib/flor/unit/storage.rb, line 459 def trigger_timers synchronize do load_timers.each do |t| @db.transaction do next unless reschedule_timer(t) == 1 trigger_timer(t) end end end end
Source
# File lib/flor/unit/storage.rb, line 397 def unreserve_messages(max_sec) tstamp = Flor.tstamp(Time.now - max_sec) tstamp = tstamp[0..tstamp.rindex('.')] synchronize do @db[:flor_messages] .where(status: 'reserved') .where { mtime < tstamp } .update(status: 'created') end rescue => err @unit.logger.warn( "#{self.class}#unreserve_messages(#{max_sec})", err, '(returning nil)') -1 # not zero, to indicate a problem end
Protected Instance Methods
Source
# File lib/flor/unit/storage.rb, line 591 def _commaify(o) if Flor.is_regex_tree?(o) o[1].to_s else #if o.is_a?(String) o.split(/\s*,\s*/).join(',') end end
Source
# File lib/flor/unit/storage.rb, line 884 def call_back(block, table, action, *rest) block.call( *( block.arity < 0 ? [ table, action, *rest ] : [ table, action, *rest ][0, block.arity])) end
Source
# File lib/flor/unit/storage.rb, line 876 def callback(table, action, *rest) (@callbacks[table] || []) .each { |as, block| call_back(block, table, action, *rest) \ if as.empty? || as.include?(action) } end
Source
# File lib/flor/unit/storage.rb, line 600 def commaify(o) return nil unless o o = [ o ] if Flor.is_regex_tree?(o) o = [ o ] unless o.is_a?(Array) o.collect { |e| _commaify(e) }.join(',') end
Source
# File lib/flor/unit/storage.rb, line 853 def compute_next_time(type, string, from=nil) f = case type when 'cron' then Fugit.parse_cron(string) || Fugit.parse_nat(string) when 'at' then Fugit.parse_at(string) when 'in' then Fugit.parse_duration(string) #when 'every' then Fugit.parse_duration(string) else Fugit.parse(string) end nt = f.is_a?(Time) ? f : f.next_time(from || Time.now) # local... Flor.tstamp(nt.utc) end
Source
# File lib/flor/unit/storage.rb, line 919 def connect @db = derive_db class << @db; attr_accessor :flor_unit; end @db.flor_unit = @unit if cv = @unit.conf['sto_connection_validation'] to = cv.is_a?(Numeric) || cv.is_a?(String) ? cv.to_i : -1 @db.extension(:connection_validator) @db.pool.connection_validation_timeout = to # NB: -1 means "check at every use" end if @unit.conf['sto_db_logger'] != false @db_logger = DbLogger.new(@unit) @db.loggers << @db_logger end end
Source
# File lib/flor/unit/storage.rb, line 610 def consume_and_archive(messages) transync do n = Flor.tstamp u = @unit.identifier @db[:flor_messages] .where( id: messages.collect { |m| m['mid'] }.uniq.compact) .update( status: 'consumed', mtime: n, munit: u) @db[:flor_messages] .import( MESSAGE_COLUMNS, messages .select { |m| ! m['mid'] && POINTS_TO_ARCHIVE.include?(m['point']) } .map { |m| [ Flor.domain(m['exid']), m['exid'], m['point'], to_blob(m), 'consumed', n, n, u, u ] }) end end
Source
# File lib/flor/unit/storage.rb, line 635 def consume_and_discard(messages) synchronize do @db[:flor_messages] .where( id: messages.collect { |m| m['mid'] }.uniq.compact) .delete end end
Source
# File lib/flor/unit/storage.rb, line 901 def derive_db db = @unit.conf['sto_db'] return db if db uri = @unit.conf['sto_uri'] fail ArgumentError.new("no 'sto_uri' conf, cannot connect to db") \ unless uri begin Kernel.const_get(uri) rescue NameError Sequel.connect(uri) end end
Source
# File lib/flor/unit/storage.rb, line 839 def determine_type_and_schedule(message) t, s = message['type'], message['string'] return [ t, s ] if t t = Fugit.determine_type(s) return [ t, s ] if t s = "every #{s}" return [ 'cron', s ] if Fugit.parse_nat(s) nil end
def pointer_columns
@pointer_columns ||= if @db[:flor_pointers].columns.include?(:content) POINTER_COLUMNS + [ :content ] else POINTER_COLUMNS end
end
Source
# File lib/flor/unit/storage.rb, line 957 def from_blob(content); self.class.from_blob(content); end
Source
# File lib/flor/unit/storage.rb, line 646 def load_timers timers .where(status: 'active') .where { ntime <= Flor.tstam } .order(:ntime) .all rescue => err @unit.logger.warn("#{self.class}#load_timers()", err, '(returning [])') [] end
Source
# File lib/flor/unit/storage.rb, line 579 def migration_table_and_column(opts={}) [ (opts[:migration_table] || @unit.conf['db_migration_table'] || @unit.conf['sto_migration_table'] || :schema_info).to_sym, (opts[:migration_column] || @unit.conf['db_migration_column'] || @unit.conf['sto_migration_column'] || :version).to_sym ] end
Source
# File lib/flor/unit/storage.rb, line 708 def remove_nodes(exe, status, now) exid = exe['exid'] x = (status == 'terminated') ? {} : { nid: exe['nodes'].keys } # if 'terminated' include all nodes if @archive @db[:flor_timers].where(exid: exid).exclude(x).update(status: 'removed') @db[:flor_traps].where(exid: exid).exclude(x).update(status: 'removed') else @db[:flor_timers].where(exid: exid).exclude(x).delete @db[:flor_traps].where(exid: exid).exclude(x).delete end #@db[:flor_pointers].where(exid: exid).exclude(x).delete # done in update_pointers end
Source
# File lib/flor/unit/storage.rb, line 666 def reschedule_timer(t) w = { id: t.id.to_i, status: 'active', mtime: t.mtime, munit: t.munit } r = nil if t.type != 'at' && t.type != 'in' r = @db[:flor_timers] .where(w) .update( count: t.count.to_i + 1, status: 'active', ntime: compute_next_time(t.type, t.schedule, t.ntime_t), mtime: Flor.tstamp, munit: @unit.identifier) callback(:timers, :update, w, t) elsif @archive r = @db[:flor_timers] .where(w) .update( count: t.count.to_i + 1, status: 'triggered', mtime: Flor.tstamp, munit: @unit.identifier) callback(:timers, :update, w, t) else r = @db[:flor_timers] .where(w) .delete callback(:timers, :delete, w, t) end r end
Source
# File lib/flor/unit/storage.rb, line 869 def split_domain(exid) Flor.domain(exid) .split('.') .inject([]) { |a, elt| a << [ a.last, elt ].compact.join('.'); a } end
Source
# File lib/flor/unit/storage.rb, line 956 def to_blob(h); self.class.to_blob(h); end
Source
# File lib/flor/unit/storage.rb, line 661 def trigger_timer(t) put_messages([ t.to_trigger_message ], false) end
Source
# File lib/flor/unit/storage.rb, line 729 def update_pointers(exe, status, now) # Q Should we archive old pointers? # A Well, it might be better to only archive the execution and leave # in there enough information... exid = exe['exid'] if status == 'terminated' @db[:flor_pointers].where(exid: exid).delete return end @db[:flor_pointers] .where(exid: exid) .where(Sequel.|({ type: FP_TYPES }, Sequel.~(nid: exe['nodes'].keys))) .delete # # Delete all pointer to vars, their value might have changed, # let's reinsert them. # Delete pointers to gone nodes. dom = Flor.domain(exid) u = @unit.identifier pointers = exe['nodes'] .inject([]) { |a, (nid, node)| # add a pointer for each tag ts = node['tags'] ts.each { |t| a << [ dom, exid, nid, 'tag', t, nil, now, u, nil ] } if ts # add a pointer for each var (if nid == '0') vs = nid == '0' ? node['vars'] : nil vs.each { |k, v| case v; when Numeric, String, TrueClass, FalseClass, NilClass a << [ dom, exid, '0', 'var', k, v.to_s, now, u, v ] when Array, Hash s = '(array)'; s = '(object)' if v.is_a?(Hash) a << [ dom, exid, '0', 'var', k, s, now, u, v ] else a << [ dom, exid, '0', 'var', k, nil, now, u, v ] end } if vs # add a pointer for the task if any if ta = node['task'] tasker = ta['tasker'] n = ta['name']; name = n.is_a?(String) ? n : JSON.dump(n) content = { message: node['message'], atts: node['atts'] } a << [ dom, exid, nid, 'tasker', tasker, name, now, u, content ] end # add a pointer for the error if any if fa = node['failure'] #puts "-" * 80; pp node; puts "-" * 80 a << if er = fa['error'] ni = fa['from'] || nid # not nid /!\ nam = "#{er['kla']} l#{er['lin']}" val = er['msg'] con = { error: fa, nid: ni } [ dom, exid, ni, 'failure', nam, val, now, u, con ] else nam = fa['tasker'] || 'failure' val = [ fa['attl'] || [], fa['attd'] || {} ] .collect(&:inspect).join(' ') con = { error: fa, nid: nid } [ dom, exid, nid, 'failure', nam, val, now, u, con ] end end # done a } cps = @db[:flor_pointers] # current pointers .where(exid: exid) .select(:nid, :type, :name) .all pointers.reject! { |_, _, ni, ty, na, _, _, _, _| cps.find { |cp| cp[:nid] == ni && cp[:type] == ty && cp[:name] == na } } # # don't insert when already inserted pointers.each { |ptr| c = ptr[8]; ptr[8] = to_blob(c) if c } @db[:flor_pointers] .import( POINTER_COLUMNS, pointers, skip_transaction: true) callback(:pointers, :update, exid) end