class DbSucker::Application::SlotPool

Constants

PoolAlreadyClosedError
SlotAllocationError

Public Class Methods

expose(what, &how) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 19
def self.expose what, &how
  define_method(what) do |*args, &block|
    sync { instance_exec(*args, &how) }
  end
end
new(slots = 1, name = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 7
def initialize slots = 1, name = nil
  @name = name
  @slots = slots
  @monitor = Monitor.new
  @closed = false
  @softclosed = false
  @waiting = []
  @active = []
  @signal = @monitor.new_cond
  @closed_signal = @monitor.new_cond
end

Public Instance Methods

aquire(tthr = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 111
def aquire tthr = nil
  wthr = Thread.current
  tthr ||= Thread.current
  sync do
    raise PoolAlreadyClosedError, "slot pool has already been closed, cannot aquire slot" if closed?
    @waiting << [wthr, tthr]
    #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> broadcasting signal after adding new waiter"
    @signal.broadcast # signal polling threads
  end
  tthr.signal # signal target thread to continue and poll
  wthr.wait # suspend thread until we aquired it
  true
end
aquired?(tthr = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 82
def aquired? tthr = nil
  @active.include?(tthr || Thread.current)
end
close() click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 40
def close
  sync do
    @closed = true
    @signal.broadcast
  end
  true
end
close!() click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 48
def close!
  sync do
    close
    @closed_signal.wait if active?
  end
end
dequeue_waiting!() click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 60
def dequeue_waiting!
  sync do
    while @waiting.any?
      _wthr, _tthr = @waiting.shift
      _wthr.signal
    end
    @signal.broadcast
  end
end
puts(*a) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 78
def puts *a
  Thread.main[:app].puts(*a)
end
qindex(thr = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 70
def qindex thr = nil
  thr ||= Thread.current
  sync do
    index = @waiting.find_index {|wthr, tthr| tthr == thr }
    index ? index + 1 : false
  end
end
release(tthr = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 125
def release tthr = nil
  sync do
    tthr ||= Thread.current
    ai = @active.delete(tthr)
    return unless ai
    #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> broadcasting signal"
    @signal.broadcast
    @closed_signal.broadcast if @active.empty? && closed?
  end
end
softclose!() click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 55
def softclose!
  @softclosed = true
  dequeue_waiting!
end
sync(&block) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 25
def sync &block
  @monitor.synchronize(&block)
end
wait_aquired(tthr = nil) click to toggle source
# File lib/db_sucker/application/slot_pool.rb, line 86
def wait_aquired tthr = nil
  tthr ||= Thread.current
  #tthr.wait(0.1) until qindex(tthr)
  loop do
    sync do
      #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> wait for index"
      #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> has #{available_slots} slots"
      while slots? && @waiting.any?
        _wthr, _tthr = @waiting.shift
        #puts "<#{Time.current.to_f}-#{_tthr[:current_task]}> running now"
        @active.push(_tthr) unless @softclosed
        _tthr.signal
        _wthr.signal
      end
      unless qindex(tthr)
        #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> return"
        return
      end
      #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> wait"
      @signal.wait #(1)
      #puts "<#{Time.current.to_f}-#{tthr[:current_task]}> wait DONE"
    end
  end
end