class Flor::WaitList
Constants
- DEFAULT_TIMEOUT
-
Regular waiters are message waiters, they wait for a message that matches a pattern
Row waiters are waiting for the pattern to realize in the database a better name would probably have been “query waiter”. Row waiters need their own thread for checking at interval. Row waiters can live in a different Ruby process from the Ruby process performing the executions.
Public Class Methods
Source
# File lib/flor/unit/wlist.rb, line 26 def initialize(unit) @unit = unit @unit.hooker.add('wlist', self) @mutex = Mutex.new @msg_waiters = [] @row_waiters = [] @unit.instance_eval do def wait(exid, opts=true, more=nil) @hooker['wlist'].wait(exid, opts, more) end end @row_thread = nil @row_thread_status = nil @row_frequency = @unit.conf['wtl_row_frequency'] || 1 end
Public Instance Methods
Source
# File lib/flor/unit/wlist.rb, line 53 def notify(executor, message) return [] unless message['consumed'] return [] if @msg_waiters.empty? @mutex.synchronize do to_remove = @msg_waiters.each_with_object([]) do |w, a| remove = w.notify(executor, message) a << w if remove end @msg_waiters -= to_remove end [] # no new messages end
Source
# File lib/flor/unit/wlist.rb, line 46 def shutdown @row_thread_status = :shutdown nil end
Source
# File lib/flor/unit/wlist.rb, line 36 def wait(exid, opts=true, more=nil) @hooker['wlist'].wait(exid, opts, more) end
Protected Instance Methods
Source
# File lib/flor/unit/wlist.rb, line 135 def check @mutex.synchronize do to_remove = [] rs = row_query_all @row_waiters.each do |w| remove = w.check(@unit, rs) to_remove << w if remove end @row_waiters -= to_remove end rescue => err @unit.logger.warn("#{self.class}#check()", err) end
Source
# File lib/flor/unit/wlist.rb, line 168 def row_query(klass, hs) return [] if hs.empty? q = @unit.send(klass).where(hs.shift) hs.each { |h| q = q.or(h) } q.all end
Source
# File lib/flor/unit/wlist.rb, line 156 def row_query_all exes, ptrs = @row_waiters.inject([ [], [] ]) { |a, w| es, ps = w.to_query_hashes a[0].concat(es) a[1].concat(ps) a } [ row_query(:executions, exes), row_query(:pointers, ptrs) ] end
Source
# File lib/flor/unit/wlist.rb, line 117 def start_row_thread return if @row_thread_status == :shutdown @row_thread = nil if @row_thread && ! @row_thread.alive? @row_thread_status = :running @row_thread ||= Thread.new do loop do sleep(@row_frequency) break if [ :stop, :shutdown ].include?(@row_thread_status) break if @row_waiters.empty? check end end end