class DbSucker::Application::EventedResultset
Constants
- SetAlreadyClosedError
Public Class Methods
new()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 7 def initialize @store = [] @monitor = Monitor.new @closed = false @close_signal = @monitor.new_cond @value_signal = @monitor.new_cond end
Public Instance Methods
[](which)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 80 def [] which wait @store[which].try(:last) end
close!()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 35 def close! sync do @closed = true @value_signal.broadcast @close_signal.broadcast end true end
closed?()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 44 def closed? sync { @closed } end
each(&block)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 64 def each &block wait if block @store.each do |group, data| block.call(data) end else @store.map(&:second) end end
each_line(&block)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 90 def each_line &block Thread.current[self.object_id.to_s] = nil loop do data = gets if !data break if closed? next end block.call(data) end ensure Thread.current[self.object_id.to_s] = nil end
each_linex(&block)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 104 def each_linex &block Thread.current[self.object_id.to_s] = nil loop do group, entry = getx unless entry break if closed? next end block.call(group, entry) end ensure Thread.current[self.object_id.to_s] = nil end
eachx(&block)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 85 def eachx &block wait @store.each(&block) end
empty?()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 48 def empty? sync { @store.empty? } end
enq(data, group = nil)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 19 def enq data, group = nil sync do raise SetAlreadyClosedError, "failed to enqueue data: resultset is already closed" if closed? @store << [group.try(:to_sym), data] @value_signal.broadcast end end
for_group(group)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 60 def for_group group @store.map{|grp, data| grp == group.try(:to_sym) ? data : nil }.compact end
gets()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 118 def gets sync do Thread.current[self.object_id.to_s] ||= -1 if !closed? && !@store[Thread.current[self.object_id.to_s]+1] @value_signal.wait end if @store[Thread.current[self.object_id.to_s]+1] Thread.current[self.object_id.to_s] += 1 @store[Thread.current[self.object_id.to_s]].try(:last) else false end end end
getx()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 133 def getx sync do Thread.current[self.object_id.to_s] ||= -1 if !closed? && !@store[Thread.current[self.object_id.to_s]+1] @value_signal.wait end if @store[Thread.current[self.object_id.to_s]+1] Thread.current[self.object_id.to_s] += 1 @store[Thread.current[self.object_id.to_s]] else false end end end
join(*a)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 75 def join *a wait each.join(*a) end
push(*args)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 27 def push *args sync do args.each {|a| enq(a) } @store end end
Also aliased as: <<
sync(&block)
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 15 def sync &block @monitor.synchronize(&block) end
wait()
click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 52 def wait sync do return if closed? @close_signal.wait end true end