class CI::Queue::Redis::Worker
Attributes
index[R]
total[R]
Public Class Methods
new(redis, config)
click to toggle source
Calls superclass method
CI::Queue::Redis::Base::new
# File lib/ci/queue/redis/worker.rb, line 17 def initialize(redis, config) @last_warning = nil @reserved_test = nil @shutdown_required = false super(redis, config) end
Public Instance Methods
acknowledge(test)
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 90 def acknowledge(test) test_key = test.id raise_on_mismatching_test(test_key) eval_script( :acknowledge, keys: [key('running'), key('processed')], argv: [test_key], ) == 1 end
build()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 86 def build @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config) end
master?()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 43 def master? @master end
poll() { |fetch, last_warning| ... }
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 47 def poll wait_for_master until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? if test = reserve yield index.fetch(test), @last_warning else sleep 0.05 end end rescue *CONNECTION_ERRORS end
populate(tests, random: Random.new)
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 24 def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h tests = Queue.shuffle(tests, random) push(tests.map(&:id)) self end
populated?()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 31 def populated? !!defined?(@index) end
requeue(test, offset: Redis.requeue_offset)
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 100 def requeue(test, offset: Redis.requeue_offset) test_key = test.id raise_on_mismatching_test(test_key) global_max_requeues = config.global_max_requeues(total) requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script( :requeue, keys: [key('processed'), key('requeues-count'), key('queue'), key('running')], argv: [config.max_requeues, global_max_requeues, test_key, offset], ) == 1 @reserved_test = test_key unless requeued requeued end
retry_queue()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 73 def retry_queue failures = build.failed_tests.to_set log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1) log.select! { |id| failures.include?(id) } log.uniq! log.reverse! Retry.new(log, config, redis: redis) end
retrying?()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 60 def retrying? redis.exists?(key('worker', worker_id, 'queue')) rescue *CONNECTION_ERRORS false end
shutdown!()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 35 def shutdown! @shutdown_required = true end
shutdown_required?()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 39 def shutdown_required? @shutdown_required end
supervisor()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 82 def supervisor Supervisor.new(redis_url, config) end
Private Instance Methods
push(tests)
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 166 def push(tests) @total = tests.size if @master = redis.setnx(key('master-status'), 'setup') redis.multi do redis.lpush(key('queue'), tests) unless tests.empty? redis.set(key('total'), @total) redis.set(key('master-status'), 'ready') end end register rescue *CONNECTION_ERRORS raise if @master end
raise_on_mismatching_test(test)
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 127 def raise_on_mismatching_test(test) if @reserved_test == test @reserved_test = nil else raise ReservationError, "Acknowledged #{test.inspect} but #{@reserved_test.inspect} was reserved" end end
register()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 181 def register redis.sadd(key('workers'), worker_id) end
reserve()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 135 def reserve if @reserved_test raise ReservationError, "#{@reserved_test.inspect} is already reserved. " \ "You have to acknowledge it before you can reserve another one" end @reserved_test = (try_to_reserve_lost_test || try_to_reserve_test) end
timeout()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 123 def timeout config.timeout end
try_to_reserve_lost_test()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 152 def try_to_reserve_lost_test lost_test = eval_script( :reserve_lost, keys: [key('running'), key('completed'), key('worker', worker_id, 'queue')], argv: [Time.now.to_f, timeout], ) if lost_test build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: timeout) end lost_test end
try_to_reserve_test()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 144 def try_to_reserve_test eval_script( :reserve, keys: [key('queue'), key('running'), key('processed'), key('worker', worker_id, 'queue')], argv: [Time.now.to_f], ) end
worker_id()
click to toggle source
# File lib/ci/queue/redis/worker.rb, line 119 def worker_id config.worker_id end