class S3DataPacker::ThreadSet

Attributes

lock[R]
queue[R]
workers[R]

Public Class Methods

new(opts ={}) click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 5
def initialize opts ={}
  @lock = Mutex.new
  @workers = []
  @finish = false
  @queue = S3DataPacker::Queue.new
end

Public Instance Methods

dead?() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 24
def dead?
  workers.map(&:status).uniq == [nil] || workers.map(&:status).uniq == [false]
end
finish!() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 39
def finish!
  @finish = true
end
finished?() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 43
def finished?
  @finish == true && queue.size == 0
end
kill!() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 28
def kill!
  log 'All', "Killing #{workers.size} workers"
  workers.map(&:kill)
end
lock_wait_time() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 16
def lock_wait_time
  @lock_wait_time ||= S3DataPacker.config.thread_lock_wait_time
end
log(id, message, level = :info) click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 47
def log id, message, level = :info
  logger.send level, "Thread #{id}: #{message}"
end
reset!() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 33
def reset!
  return unless dead?
  @finish = false
  @workers = []
end
spawn_thread!(id) { |item| ... } click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 55
def spawn_thread! id, &block
  @workers << Thread.new do
    log id, "Started!"
    loop do
      if finished?
        log id, "Finish signal up and no more work to pull - Exiting"
        break
      end
      item = queue.fetch!
      if item
        log id, "Processing item #{item}", :debug
        begin
          yield item
        rescue ThreadError
          log id, "Locked, retry in #{lock_wait_time}", :warn
          sleep(lock_wait_time)
          retry
        end
      else
        log id, "No more work found, sleeping for #{wait_time}"
        sleep(wait_time)
      end
    rescue Exception => e
      log id, 'Unexpected error!'
      raise e
    end
  end
end
spawn_threads!(&block) click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 84
def spawn_threads! &block
  logger.info "Spawning #{thread_count} threads"
  thread_count.times do |id|
    spawn_thread!(id, &block)
  end
end
thread_count() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 20
def thread_count
  @thread_count ||= S3DataPacker.config.thread_count
end
wait!() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 51
def wait!
  workers.map(&:join)
end
wait_time() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 12
def wait_time
  @wait_time ||= S3DataPacker.config.thread_sleep_time
end

Private Instance Methods

logger() click to toggle source
# File lib/s3_data_packer/thread_set.rb, line 93
def logger
  @logger ||= S3DataPacker.logger
end