class Workerholic::WorkerBalancer

Attributes

alive[R]
auto[R]
queues[RW]
storage[R]
thread[R]
workers[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/workerholic/worker_balancer.rb, line 6
def initialize(opts = {})
  @storage = Storage::RedisWrapper.new
  @queues = fetch_queues
  @workers = opts[:workers] || []
  @alive = true
  @logger = LogManager.new
  @auto = opts[:auto_balance]
end

Public Instance Methods

kill() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 23
def kill
  thread.kill
end
start() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 15
def start
  if auto
    auto_balance_workers
  else
    evenly_balance_workers
  end
end

Private Instance Methods

assign_one_worker_per_queue() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 87
def assign_one_worker_per_queue
  index = 0
  while index < queues.size && index < workers.size
    workers[index].queue = queues[index]
    index += 1
  end

  index
end
assign_workers_to_queue(queue, workers_count, total_workers_count) click to toggle source
# File lib/workerholic/worker_balancer.rb, line 124
def assign_workers_to_queue(queue, workers_count, total_workers_count)
  total_workers_count.upto(total_workers_count + workers_count - 1) do |i|
    workers.to_a[i].queue = Queue.new(queue.name)
  end
end
auto_balance_workers() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 59
def auto_balance_workers
  @thread = Thread.new do
    while alive
      auto_balanced_workers_distribution
      output_balancer_stats

      sleep 1
    end
  end
end
auto_balanced_workers_distribution() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 70
def auto_balanced_workers_distribution
  self.queues = fetch_queues

  total_workers_count = assign_one_worker_per_queue

  remaining_workers_count = workers.size - total_workers_count
  average_jobs_count_per_worker = total_jobs / remaining_workers_count.to_f

  total_workers_count = provision_queues(io_queues, average_jobs_count_per_worker, total_workers_count)

  distribute_unassigned_worker(total_workers_count)
end
current_workers_count_per_queue() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 158
def current_workers_count_per_queue
  workers.reduce({}) do |result, worker|
    if worker.queue
      result[worker.queue.name] = result[worker.queue.name] ? result[worker.queue.name] + 1 : 1
    end

    result
  end
end
distribute_unassigned_worker(total_workers_count) click to toggle source
# File lib/workerholic/worker_balancer.rb, line 136
def distribute_unassigned_worker(total_workers_count)
  workers[workers.size - 1].queue = io_queues.find { |q| q.size == io_queues.map(&:size).max } if workers.size - total_workers_count == 1
end
evenly_balance_workers() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 29
def evenly_balance_workers
  @thread = Thread.new do
    while alive
      evenly_balanced_workers_distribution
      output_balancer_stats

      sleep 1
    end
  end
end
evenly_balanced_workers_distribution() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 40
def evenly_balanced_workers_distribution
  self.queues = fetch_queues

  total_workers_count = assign_one_worker_per_queue

  remaining_workers_count = workers.size - total_workers_count

  queues.each do |queue|
    workers_count = remaining_workers_count / queues.size
    workers_count = round(workers_count)

    assign_workers_to_queue(queue, workers_count, total_workers_count)

    total_workers_count += workers_count
  end

  distribute_unassigned_worker(total_workers_count)
end
fetch_queues() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 83
def fetch_queues
  storage.fetch_queue_names.map { |queue_name| Queue.new(queue_name) }
end
io_queues() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 101
def io_queues
  io_qs = queues.select { |q| q.name.match(/.*-io$/) }

  if io_qs.empty?
    queues
  else
    io_qs
  end
end
output_balancer_stats() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 140
    def output_balancer_stats
      queues_with_size = queues.map { |q| { name: q.name, size: q.size } }

      queues_with_size.each do |q|
        output = <<~LOG
          Queue #{q[:name]}:
          => #{q[:size]} jobs
          => #{current_workers_count_per_queue[q[:name]]} workers
        LOG
        @logger.info(output)
      end

      if queues_with_size.empty?
        @logger.info("DONE")
        raise Interrupt
      end
    end
provision_queues(qs, average_jobs_count_per_worker, total_workers_count) click to toggle source
# File lib/workerholic/worker_balancer.rb, line 111
def provision_queues(qs, average_jobs_count_per_worker, total_workers_count)
  qs.each do |q|
    workers_count = q.size / average_jobs_count_per_worker
    workers_count = round(workers_count)

    assign_workers_to_queue(q, workers_count, total_workers_count)

    total_workers_count += workers_count
  end

  total_workers_count
end
round(n) click to toggle source
# File lib/workerholic/worker_balancer.rb, line 130
def round(n)
  return n.floor if n % 1 == 0.5

  n.round
end
total_jobs() click to toggle source
# File lib/workerholic/worker_balancer.rb, line 97
def total_jobs
  io_queues.map(&:size).reduce(:+) || 0
end