class Balsamique
Constants
- ACCELERATE_RETRIES
- ACCELERATE_RETRIES_SHA
- DEQUEUE_TASK
Lua script
DEQUEUE_TASK
takes keys[args_h, tasks_h, questats_h, retries_h, task1_z, ...],
and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.
- DEQUEUE_TASK_SHA
- ENQUEUE_JOB
Lua script
ENQUEUE_JOB
takes keys- tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h
-
and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.
- ENQUEUE_JOB_SHA
- FAIL_TASK
- FAIL_TASK_SHA
- REPORT_COMPLETE
- REPORT_COMPLETE_SHA
- REPORT_POP
- REPORT_POP_SHA
- REPORT_RETRY_DELAY
- RESCHEDULE
- RESCHEDULE_SHA
- RETRY_DELAY
- SET_ID_FLOOR
- SET_ID_FLOOR_SHA
- STATS_CHUNK
- STATS_SLICE
- SUCCEED_TASK
- SUCCEED_TASK_SHA
Public Class Methods
# File lib/balsamique.rb, line 74 def self.assemble_timestamp(chunk, slice) (chunk * STATS_CHUNK + slice) * STATS_SLICE end
# File lib/balsamique.rb, line 47 def self.current_task(tasks) item = tasks.reverse.find { |t| t.size > 1 } item && item.first end
# File lib/balsamique.rb, line 68 def self.dec36(s) s.to_i(36) end
# File lib/balsamique.rb, line 77 def self.dec36_assemble_timestamp(echunk, eslice) self.assemble_timestamp(self.dec36(echunk), self.dec36(eslice)) end
# File lib/balsamique.rb, line 65 def self.enc36(i) i.to_s(36) end
# File lib/balsamique.rb, line 71 def self.enc36_slice_timestamp(ts) self.slice_timestamp(ts).map { |i| self.enc36(i) } end
# File lib/balsamique.rb, line 6 def initialize(redis, namespace = 'bQ') @redis = redis @que_prefix = namespace + ':que:' @questats_prefix = namespace + ':questats:' @env_prefix = namespace + ':env:' @status = namespace + ':status' @queues = namespace + ':queues' @retries = namespace + ':retries' @failures = namespace + ':failures' @failz = namespace + ':failz' @unique = namespace + ':unique' @tasks = namespace + ':tasks' @args = namespace + ':args' @report_queue = @que_prefix + '_report' end
# File lib/balsamique.rb, line 42 def self.next_task(tasks) item = tasks.find { |t| t.size == 1 } item && item.first end
# File lib/balsamique.rb, line 61 def self.slice_timestamp(ts) slice = ts.to_i / STATS_SLICE return slice / STATS_CHUNK, slice % STATS_CHUNK end
# File lib/balsamique.rb, line 52 def self.strip_prefix(str, prefix) s = prefix.size if str[0,s] == prefix str[s, str.size - s] end end
Public Instance Methods
# File lib/balsamique.rb, line 289 def accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES, [@que_prefix + queue], [incr, timestamp]) end
# File lib/balsamique.rb, line 548 def complete_report(id) redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE, [@report_queue, @retries], [id]) end
# File lib/balsamique.rb, line 409 def decode_job_status(status) queue, ts, *retries = status.split(',') ts = ts.to_f timestamps = [ts] while retries.size > 0 i = retries.shift.to_i timestamps[i] = retries.shift.to_f end return queue, timestamps end
# File lib/balsamique.rb, line 345 def delete_queue(queue) queue_key = @que_prefix + queue.to_s redis.multi do |r| r.del(queue_key) r.hdel(@queues, queue_key) end.last == 1 end
# File lib/balsamique.rb, line 171 def dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) stats_chunk, stats_slice = self.class.enc36_slice_timestamp(timestamp) questats_key = @questats_prefix + stats_chunk keys = [@args, @tasks, questats_key, @retries] tasks.each { |task| keys << @que_prefix + task.to_s } result = redis_eval( DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys, [timestamp, retry_delay, stats_slice]) if result id, args, tasks, retries = result { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks), retries: retries } end end
# File lib/balsamique.rb, line 128 def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) next_task = self.class.next_task(tasks) return false, nil unless next_task queue_key = @que_prefix + next_task.to_s keys = [@tasks, @args, @status, queue_key, @queues, @unique] argv = [tasks.to_json, args.to_json, run_at] argv << uniq_in_flight if uniq_in_flight result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv) return result_id > 0, result_id.abs.to_s end
# File lib/balsamique.rb, line 269 def fail(id, task, details, timestamp = Time.now.to_f) keys = [@tasks, @status, @retries, @failz, @failures, @report_queue] argv = [id, timestamp, @que_prefix + task, JSON.generate(details)] id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv) end
# File lib/balsamique.rb, line 341 def failures(*args) get_failures(get_failz(*args)) end
# File lib/balsamique.rb, line 481 def fill_args_tasks(statuses) ids = statuses.keys args, tasks = redis.multi do |r| r.hmget(@args, ids) r.hmget(@tasks, ids) end ids.zip(args, tasks).each do |id, a, t| statuses[id][:args] = a && JSON.parse(a) statuses[id][:tasks] = t && JSON.parse(t) end end
# File lib/balsamique.rb, line 464 def fill_job_failures(statuses) failz = {} statuses.each do |id, status| next unless (task = status[:task]) timestamps = status[:timestamps] next unless timestamps.size > 1 queue = @que_prefix + task timestamps.drop(1).each_with_index do |ts, i| failz["#{id},#{queue},#{i+1}"] = ts end end get_failures(failz).each do |id, failures| statuses[id][:failures] = failures end statuses end
# File lib/balsamique.rb, line 570 def get_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.hgetall(hkey) elsif keys.empty? {} else result = {} values = redis.hmget(hkey, keys) keys.zip(values).each { |k, v| result[k] = v } result end end
# File lib/balsamique.rb, line 310 def get_failures(failz) result = Hash.new { Array.new } fkeys = failz.keys if fkeys.size > 0 failures = redis.hmget(@failures, fkeys) fkeys.zip(failures).each do |key, details| id, queue, r = key.split(',') r = r.to_i task = self.class.strip_prefix(queue, @que_prefix) result[id] <<= { task: task, retries: r, ts: failz[key], details: details && JSON.parse(details) } end end result end
# File lib/balsamique.rb, line 327 def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100) values = if limit < 0 redis.zrevrangebyscore( @failz, latest, earliest, limit: [0, -limit], with_scores: true) else redis.zrangebyscore( @failz, earliest, latest, limit: [0, limit], with_scores: true) end result = {} values.each { |v| result[v[0]] = v[1] } result end
# File lib/balsamique.rb, line 451 def job_status(*ids) statuses = redis.hmget(@status, *ids) result = {} ids.zip(statuses).each do |(id, status)| next unless status queue, timestamps = decode_job_status(status) result[id] = { task: self.class.strip_prefix(queue, @que_prefix), timestamps: timestamps } end result end
# File lib/balsamique.rb, line 535 def pop_report(timestamp = Time.now.to_f) result = redis_eval( REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries], [timestamp, REPORT_RETRY_DELAY]) result &&= JSON.parse(result) end
# File lib/balsamique.rb, line 513 def push_report(id, timestamp = Time.now.to_f) redis.multi do |r| r.hdel(@retries, "#{id},#{@report_queue}") r.zadd(@report_queue, timestamp, id) end end
# File lib/balsamique.rb, line 553 def put_env(topic, h) return if h.empty? kvs = [] h.each { |k, v| kvs << k << v } hkey = @env_prefix + topic.to_s 'OK' == redis.hmset(hkey, *kvs) end
# File lib/balsamique.rb, line 390 def queue_length(queue) redis.zcard(@que_prefix + queue) || 0 end
# File lib/balsamique.rb, line 394 def queue_peek(queue, n = 1000) qkey = @que_prefix + queue result = {} redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item| result[item[0]] = { ts: item[1] } end return result if result.empty? retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" }) result.keys.zip(retries).each do |item| id, r = item result[id][:retries] = r.to_i end result end
# File lib/balsamique.rb, line 493 def queue_stats(chunks = 3, latest = Time.now.to_f) last_chunk, last_slice = self.class.slice_timestamp(latest) stats = {} (0..(chunks - 1)).each do |chunk_i| chunk_ts = self.class.enc36(last_chunk - chunk_i) questats_key = @questats_prefix + chunk_ts stats_chunk = redis.hgetall(questats_key) next unless stats_chunk stats_chunk.each do |key, val| queue, stat, slice = key.split(',') queue = self.class.strip_prefix(queue, @que_prefix) timestamp = self.class.dec36_assemble_timestamp(chunk_ts, slice) stats[stat] = {} unless stats[stat] stats[stat][timestamp] = {} unless stats[stat][timestamp] stats[stat][timestamp][queue] = val.to_i end end stats end
# File lib/balsamique.rb, line 353 def queues result = redis.hgetall(@queues) result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) } end
# File lib/balsamique.rb, line 358 def queues_info qs_info = redis.hgetall(@queues) return {} if qs_info.empty? now = Time.now.to_f details = redis.multi do |r| qs_info.keys.each do |key| r.zrange(key, 0, 0, withscores: true) r.zcount(key, 0, now) r.zcard(key) end end result = {} qs_info.keys.each_with_index do |key, i| i3 = 3 * i queue = self.class.strip_prefix(key, @que_prefix) last_id, last_ts = qs_info[key].split(',') last_ts = last_ts.to_f next_id = next_ts = nil if (next_info = details[i3].first) next_id = next_info.first next_ts = next_info.last end result[queue] = { current_ts: now, last_id: last_id, last_ts: last_ts, total: details[i3 + 2], ready: details[i3 + 1], next_id: next_id, next_ts: next_ts } end result end
# File lib/balsamique.rb, line 27 def rand_delay(delay = RETRY_DELAY) delay - 0.5 * rand() * delay end
# File lib/balsamique.rb, line 31 def redis @redis end
# File lib/balsamique.rb, line 35 def redis_eval(cmd_sha, cmd, keys, argv) redis.evalsha(cmd_sha, keys, argv) rescue Redis::CommandError puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}" redis.eval(cmd, keys, argv) end
# File lib/balsamique.rb, line 420 def remove_job(id) return unless (status = redis.hget(@status, id)) queue, timestamps = decode_job_status(status) redis.multi do |r| if queue.start_with?(@que_prefix) r.zrem(queue, id) rkey = "#{id},#{queue}" r.hdel(@retries, rkey) rkeys = [] timestamps.drop(1).each_with_index do |ts, i| rkeys << rkey + ",#{i + 1}" end if rkeys.size > 0 r.hdel(@failures, rkeys) r.zrem(@failz, rkeys) end end r.hdel(@args, id) r.hdel(@tasks, id) end return unless (check_status = redis.hget(@status, id)) if check_status == status redis.hdel(@status, id) if (uid = redis.hget(@unique, id)) redis.hdel(@unique, [id, uid]) end else remove_job(id) end end
# File lib/balsamique.rb, line 305 def reschedule(queue, ids, timestamp = Time.now.to_f) redis_eval(RESCHEDULE_SHA, RESCHEDULE, [@que_prefix + queue], [timestamp] + ids) end
# File lib/balsamique.rb, line 561 def rm_env(topic, keys = nil) hkey = @env_prefix + topic.to_s if keys.nil? redis.del(hkey) elsif !keys.empty? redis.hdel(hkey, keys) end end
# File lib/balsamique.rb, line 89 def set_id_floor(id_floor) redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i]) end
# File lib/balsamique.rb, line 228 def succeed(id, tasks, timestamp = Time.now.to_f) current_task = self.class.current_task(tasks) next_task = self.class.next_task(tasks) keys = [ @tasks, @status, @retries, @report_queue, @failz, @failures, @que_prefix + current_task] argv = [id, timestamp, tasks.to_json] keys << (@que_prefix + next_task) << @queues if next_task id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv) end