class DbSucker::Application::EventedResultset

Constants

SetAlreadyClosedError

Public Class Methods

new() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 7
def initialize
  @store = []
  @monitor = Monitor.new
  @closed = false
  @close_signal = @monitor.new_cond
  @value_signal = @monitor.new_cond
end

Public Instance Methods

<<(*args)
Alias for: push
[](which) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 80
def [] which
  wait
  @store[which].try(:last)
end
close!() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 35
def close!
  sync do
    @closed = true
    @value_signal.broadcast
    @close_signal.broadcast
  end
  true
end
closed?() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 44
def closed?
  sync { @closed }
end
each(&block) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 64
def each &block
  wait
  if block
    @store.each do |group, data|
      block.call(data)
    end
  else
    @store.map(&:second)
  end
end
each_line(&block) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 90
def each_line &block
  Thread.current[self.object_id.to_s] = nil
  loop do
    data = gets
    if !data
      break if closed?
      next
    end
    block.call(data)
  end
ensure
  Thread.current[self.object_id.to_s] = nil
end
each_linex(&block) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 104
def each_linex &block
  Thread.current[self.object_id.to_s] = nil
  loop do
    group, entry = getx
    unless entry
      break if closed?
      next
    end
    block.call(group, entry)
  end
ensure
  Thread.current[self.object_id.to_s] = nil
end
eachx(&block) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 85
def eachx &block
  wait
  @store.each(&block)
end
empty?() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 48
def empty?
  sync { @store.empty? }
end
enq(data, group = nil) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 19
def enq data, group = nil
  sync do
    raise SetAlreadyClosedError, "failed to enqueue data: resultset is already closed" if closed?
    @store << [group.try(:to_sym), data]
    @value_signal.broadcast
  end
end
for_group(group) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 60
def for_group group
  @store.map{|grp, data| grp == group.try(:to_sym) ? data : nil }.compact
end
gets() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 118
def gets
  sync do
    Thread.current[self.object_id.to_s] ||= -1
    if !closed? && !@store[Thread.current[self.object_id.to_s]+1]
      @value_signal.wait
    end
    if @store[Thread.current[self.object_id.to_s]+1]
      Thread.current[self.object_id.to_s] += 1
      @store[Thread.current[self.object_id.to_s]].try(:last)
    else
      false
    end
  end
end
getx() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 133
def getx
  sync do
    Thread.current[self.object_id.to_s] ||= -1
    if !closed? && !@store[Thread.current[self.object_id.to_s]+1]
      @value_signal.wait
    end
    if @store[Thread.current[self.object_id.to_s]+1]
      Thread.current[self.object_id.to_s] += 1
      @store[Thread.current[self.object_id.to_s]]
    else
      false
    end
  end
end
join(*a) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 75
def join *a
  wait
  each.join(*a)
end
push(*args) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 27
def push *args
  sync do
    args.each {|a| enq(a) }
    @store
  end
end
Also aliased as: <<
sync(&block) click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 15
def sync &block
  @monitor.synchronize(&block)
end
wait() click to toggle source
# File lib/db_sucker/application/evented_resultset.rb, line 52
def wait
  sync do
    return if closed?
    @close_signal.wait
  end
  true
end