class CI::Queue::Redis::Worker

Attributes

index[R]
total[R]

Public Class Methods

new(redis, config) click to toggle source
Calls superclass method CI::Queue::Redis::Base::new
# File lib/ci/queue/redis/worker.rb, line 17
def initialize(redis, config)
  @last_warning = nil
  @reserved_test = nil
  @shutdown_required = false
  super(redis, config)
end

Public Instance Methods

acknowledge(test) click to toggle source
# File lib/ci/queue/redis/worker.rb, line 90
def acknowledge(test)
  test_key = test.id
  raise_on_mismatching_test(test_key)
  eval_script(
    :acknowledge,
    keys: [key('running'), key('processed')],
    argv: [test_key],
  ) == 1
end
build() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 86
def build
  @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end
master?() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 43
def master?
  @master
end
poll() { |fetch, last_warning| ... } click to toggle source
# File lib/ci/queue/redis/worker.rb, line 47
def poll
  wait_for_master
  until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
    if test = reserve
      yield index.fetch(test), @last_warning
    else
      sleep 0.05
    end
  end
rescue *CONNECTION_ERRORS
end
populate(tests, random: Random.new) click to toggle source
# File lib/ci/queue/redis/worker.rb, line 24
def populate(tests, random: Random.new)
  @index = tests.map { |t| [t.id, t] }.to_h
  tests = Queue.shuffle(tests, random)
  push(tests.map(&:id))
  self
end
populated?() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 31
def populated?
  !!defined?(@index)
end
requeue(test, offset: Redis.requeue_offset) click to toggle source
# File lib/ci/queue/redis/worker.rb, line 100
def requeue(test, offset: Redis.requeue_offset)
  test_key = test.id
  raise_on_mismatching_test(test_key)
  global_max_requeues = config.global_max_requeues(total)

  requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script(
    :requeue,
    keys: [key('processed'), key('requeues-count'), key('queue'), key('running')],
    argv: [config.max_requeues, global_max_requeues, test_key, offset],
  ) == 1

  @reserved_test = test_key unless requeued
  requeued
end
retry_queue() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 73
def retry_queue
  failures = build.failed_tests.to_set
  log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
  log.select! { |id| failures.include?(id) }
  log.uniq!
  log.reverse!
  Retry.new(log, config, redis: redis)
end
retrying?() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 60
def retrying?
  redis.exists?(key('worker', worker_id, 'queue'))
rescue *CONNECTION_ERRORS
  false
end
shutdown!() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 35
def shutdown!
  @shutdown_required = true
end
shutdown_required?() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 39
def shutdown_required?
  @shutdown_required
end
supervisor() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 82
def supervisor
  Supervisor.new(redis_url, config)
end

Private Instance Methods

push(tests) click to toggle source
# File lib/ci/queue/redis/worker.rb, line 166
def push(tests)
  @total = tests.size

  if @master = redis.setnx(key('master-status'), 'setup')
    redis.multi do
      redis.lpush(key('queue'), tests) unless tests.empty?
      redis.set(key('total'), @total)
      redis.set(key('master-status'), 'ready')
    end
  end
  register
rescue *CONNECTION_ERRORS
  raise if @master
end
raise_on_mismatching_test(test) click to toggle source
# File lib/ci/queue/redis/worker.rb, line 127
def raise_on_mismatching_test(test)
  if @reserved_test == test
    @reserved_test = nil
  else
    raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved"
  end
end
register() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 181
def register
  redis.sadd(key('workers'), worker_id)
end
reserve() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 135
def reserve
  if @reserved_test
    raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \
      "You have to acknowledge it before you can reserve another one"
  end

  @reserved_test = (try_to_reserve_lost_test || try_to_reserve_test)
end
timeout() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 123
def timeout
  config.timeout
end
try_to_reserve_lost_test() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 152
def try_to_reserve_lost_test
  lost_test = eval_script(
    :reserve_lost,
    keys: [key('running'), key('completed'), key('worker', worker_id, 'queue')],
    argv: [Time.now.to_f, timeout],
  )

  if lost_test
    build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: timeout)
  end

  lost_test
end
try_to_reserve_test() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 144
def try_to_reserve_test
  eval_script(
    :reserve,
    keys: [key('queue'), key('running'), key('processed'), key('worker', worker_id, 'queue')],
    argv: [Time.now.to_f],
  )
end
worker_id() click to toggle source
# File lib/ci/queue/redis/worker.rb, line 119
def worker_id
  config.worker_id
end