class Balsamique

Constants

ACCELERATE_RETRIES
ACCELERATE_RETRIES_SHA
DEQUEUE_TASK

Lua script DEQUEUE_TASK takes keys

[args_h, tasks_h, questats_h, retries_h, task1_z, ...],

and args [timestamp_f, retry_delay, tmod]. It performs a conditional ZPOP on task1_z, where the condition is that the score of the first item is <= timestamp_f. If nothing is available to ZPOP, it tries task2_z, etc. If an id is returned from any ZPOP, it increments the retry count in retries_h and reschedules the task accordingly. Then it writes stats info to questats_h, and returns the job information from args_h and tasks_h.

DEQUEUE_TASK_SHA
ENQUEUE_JOB

Lua script ENQUEUE_JOB takes keys

tasks_h, args_h, jobstat_h, task1_z, queues_h, uniq_h

and args [tasks, args, run_at, uniq]. uniq is optional. If it’s present, the script first checks to see if the key uniq is already set in the hash uniq_h. If so, the negative of the integer value therein is returned and the script does nothing. Otherwise, an integer id is written as that value, the tasks_h hash gets the value of tasks (JSON-encoded task list) written under the key id, the args_h hash gets the value args written under the key id, task1_z gets id zadded with score run_at. Also, task1_z is written to jobstatus_h under the key id. The value returned from the operation is the id. A successful enqueueing is thus signaled by the return of the job id, while an enqueueing blocked by the uniq_in_flight constraint returns minus the blocking id.

ENQUEUE_JOB_SHA
FAIL_TASK
FAIL_TASK_SHA
REPORT_COMPLETE
REPORT_COMPLETE_SHA
REPORT_POP
REPORT_POP_SHA
REPORT_RETRY_DELAY
RESCHEDULE
RESCHEDULE_SHA
RETRY_DELAY
SET_ID_FLOOR
SET_ID_FLOOR_SHA
STATS_CHUNK
STATS_SLICE
SUCCEED_TASK
SUCCEED_TASK_SHA

Public Class Methods

assemble_timestamp(chunk, slice) click to toggle source
# File lib/balsamique.rb, line 74
def self.assemble_timestamp(chunk, slice)
  (chunk * STATS_CHUNK + slice) * STATS_SLICE
end
current_task(tasks) click to toggle source
# File lib/balsamique.rb, line 47
def self.current_task(tasks)
  item = tasks.reverse.find { |t| t.size > 1 }
  item && item.first
end
dec36(s) click to toggle source
# File lib/balsamique.rb, line 68
def self.dec36(s)
  s.to_i(36)
end
dec36_assemble_timestamp(echunk, eslice) click to toggle source
# File lib/balsamique.rb, line 77
def self.dec36_assemble_timestamp(echunk, eslice)
  self.assemble_timestamp(self.dec36(echunk), self.dec36(eslice))
end
enc36(i) click to toggle source
# File lib/balsamique.rb, line 65
def self.enc36(i)
  i.to_s(36)
end
enc36_slice_timestamp(ts) click to toggle source
# File lib/balsamique.rb, line 71
def self.enc36_slice_timestamp(ts)
  self.slice_timestamp(ts).map { |i| self.enc36(i) }
end
new(redis, namespace = 'bQ') click to toggle source
# File lib/balsamique.rb, line 6
def initialize(redis, namespace = 'bQ')
  @redis = redis

  @que_prefix = namespace + ':que:'
  @questats_prefix = namespace + ':questats:'
  @env_prefix = namespace + ':env:'

  @status = namespace + ':status'
  @queues = namespace + ':queues'
  @retries = namespace + ':retries'
  @failures = namespace + ':failures'
  @failz = namespace + ':failz'
  @unique = namespace + ':unique'
  @tasks = namespace + ':tasks'
  @args = namespace + ':args'
  @report_queue = @que_prefix + '_report'
end
next_task(tasks) click to toggle source
# File lib/balsamique.rb, line 42
def self.next_task(tasks)
  item = tasks.find { |t| t.size == 1 }
  item && item.first
end
slice_timestamp(ts) click to toggle source
# File lib/balsamique.rb, line 61
def self.slice_timestamp(ts)
  slice = ts.to_i / STATS_SLICE
  return slice / STATS_CHUNK, slice % STATS_CHUNK
end
strip_prefix(str, prefix) click to toggle source
# File lib/balsamique.rb, line 52
def self.strip_prefix(str, prefix)
  s = prefix.size
  if str[0,s] == prefix
    str[s, str.size - s]
  end
end

Public Instance Methods

accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i) click to toggle source
# File lib/balsamique.rb, line 289
def accelerate_retries(queue, incr = 0.001, timestamp = Time.now.to_i)
  redis_eval(ACCELERATE_RETRIES_SHA, ACCELERATE_RETRIES,
    [@que_prefix + queue], [incr, timestamp])
end
complete_report(id) click to toggle source
# File lib/balsamique.rb, line 548
def complete_report(id)
  redis_eval(REPORT_COMPLETE_SHA, REPORT_COMPLETE,
    [@report_queue, @retries], [id])
end
decode_job_status(status) click to toggle source
# File lib/balsamique.rb, line 409
def decode_job_status(status)
  queue, ts, *retries = status.split(',')
  ts = ts.to_f
  timestamps = [ts]
  while retries.size > 0
    i = retries.shift.to_i
    timestamps[i] = retries.shift.to_f
  end
  return queue, timestamps
end
delete_queue(queue) click to toggle source
# File lib/balsamique.rb, line 345
def delete_queue(queue)
  queue_key = @que_prefix + queue.to_s
  redis.multi do |r|
    r.del(queue_key)
    r.hdel(@queues, queue_key)
  end.last == 1
end
dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 171
def dequeue(tasks, retry_delay = rand_delay, timestamp = Time.now.to_f)
  stats_chunk, stats_slice = self.class.enc36_slice_timestamp(timestamp)
  questats_key = @questats_prefix + stats_chunk
  keys = [@args, @tasks, questats_key, @retries]
  tasks.each { |task| keys << @que_prefix + task.to_s }
  result = redis_eval(
    DEQUEUE_TASK_SHA, DEQUEUE_TASK, keys,
    [timestamp, retry_delay, stats_slice])
  if result
    id, args, tasks, retries = result
    { id: id, args: JSON.parse(args), tasks: JSON.parse(tasks),
      retries: retries }
  end
end
enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 128
def enqueue(tasks, args, uniq_in_flight = nil, run_at = Time.now.to_f)
  next_task = self.class.next_task(tasks)
  return false, nil unless next_task
  queue_key = @que_prefix + next_task.to_s
  keys = [@tasks, @args, @status, queue_key, @queues, @unique]
  argv = [tasks.to_json, args.to_json, run_at]
  argv << uniq_in_flight if uniq_in_flight
  result_id = redis_eval(ENQUEUE_JOB_SHA, ENQUEUE_JOB, keys, argv)
  return result_id > 0, result_id.abs.to_s
end
fail(id, task, details, timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 269
def fail(id, task, details, timestamp = Time.now.to_f)
  keys = [@tasks, @status, @retries, @failz, @failures, @report_queue]
  argv = [id, timestamp, @que_prefix + task, JSON.generate(details)]
  id == redis_eval(FAIL_TASK_SHA, FAIL_TASK, keys, argv)
end
failures(*args) click to toggle source
# File lib/balsamique.rb, line 341
def failures(*args)
  get_failures(get_failz(*args))
end
fill_args_tasks(statuses) click to toggle source
# File lib/balsamique.rb, line 481
def fill_args_tasks(statuses)
  ids = statuses.keys
  args, tasks = redis.multi do |r|
    r.hmget(@args, ids)
    r.hmget(@tasks, ids)
  end
  ids.zip(args, tasks).each do |id, a, t|
    statuses[id][:args] = a && JSON.parse(a)
    statuses[id][:tasks] = t && JSON.parse(t)
  end
end
fill_job_failures(statuses) click to toggle source
# File lib/balsamique.rb, line 464
def fill_job_failures(statuses)
  failz = {}
  statuses.each do |id, status|
    next unless (task = status[:task])
    timestamps = status[:timestamps]
    next unless timestamps.size > 1
    queue = @que_prefix + task
    timestamps.drop(1).each_with_index do |ts, i|
      failz["#{id},#{queue},#{i+1}"] = ts
    end
  end
  get_failures(failz).each do |id, failures|
    statuses[id][:failures] = failures
  end
  statuses
end
get_env(topic, keys = nil) click to toggle source
# File lib/balsamique.rb, line 570
def get_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.hgetall(hkey)
  elsif keys.empty?
    {}
  else
    result = {}
    values = redis.hmget(hkey, keys)
    keys.zip(values).each { |k, v| result[k] = v }
    result
  end
end
get_failures(failz) click to toggle source
# File lib/balsamique.rb, line 310
def get_failures(failz)
  result = Hash.new { Array.new }
  fkeys = failz.keys
  if fkeys.size > 0
    failures = redis.hmget(@failures, fkeys)
    fkeys.zip(failures).each do |key, details|
      id, queue, r = key.split(',')
      r = r.to_i
      task = self.class.strip_prefix(queue, @que_prefix)
      result[id] <<= {
        task: task, retries: r, ts: failz[key],
        details: details && JSON.parse(details) }
    end
  end
  result
end
get_failz(earliest = 0, latest = Time.now.to_f, limit = -100) click to toggle source
# File lib/balsamique.rb, line 327
def get_failz(earliest = 0, latest = Time.now.to_f, limit = -100)
  values =
    if limit < 0
      redis.zrevrangebyscore(
      @failz, latest, earliest, limit: [0, -limit], with_scores: true)
    else
      redis.zrangebyscore(
      @failz, earliest, latest, limit: [0, limit], with_scores: true)
    end
  result = {}
  values.each { |v| result[v[0]] = v[1] }
  result
end
job_status(*ids) click to toggle source
# File lib/balsamique.rb, line 451
def job_status(*ids)
  statuses = redis.hmget(@status, *ids)
  result = {}
  ids.zip(statuses).each do |(id, status)|
    next unless status
    queue, timestamps = decode_job_status(status)
    result[id] = {
      task: self.class.strip_prefix(queue, @que_prefix),
      timestamps: timestamps }
  end
  result
end
pop_report(timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 535
def pop_report(timestamp = Time.now.to_f)
  result = redis_eval(
    REPORT_POP_SHA, REPORT_POP, [@report_queue, @retries],
    [timestamp, REPORT_RETRY_DELAY])
  result &&= JSON.parse(result)
end
push_report(id, timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 513
def push_report(id, timestamp = Time.now.to_f)
  redis.multi do |r|
    r.hdel(@retries, "#{id},#{@report_queue}")
    r.zadd(@report_queue, timestamp, id)
  end
end
put_env(topic, h) click to toggle source
# File lib/balsamique.rb, line 553
def put_env(topic, h)
  return if h.empty?
  kvs = []
  h.each { |k, v| kvs << k << v }
  hkey = @env_prefix + topic.to_s
  'OK' == redis.hmset(hkey, *kvs)
end
queue_length(queue) click to toggle source
# File lib/balsamique.rb, line 390
def queue_length(queue)
  redis.zcard(@que_prefix + queue) || 0
end
queue_peek(queue, n = 1000) click to toggle source
# File lib/balsamique.rb, line 394
def queue_peek(queue, n = 1000)
  qkey = @que_prefix + queue
  result = {}
  redis.zrange(qkey, 0, n - 1, with_scores: true).each do |item|
    result[item[0]] = { ts: item[1] }
  end
  return result if result.empty?
  retries = redis.hmget(@retries, result.keys.map { |id| "#{id},#{qkey}" })
  result.keys.zip(retries).each do |item|
    id, r = item
    result[id][:retries] = r.to_i
  end
  result
end
queue_stats(chunks = 3, latest = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 493
def queue_stats(chunks = 3, latest = Time.now.to_f)
  last_chunk, last_slice = self.class.slice_timestamp(latest)
  stats = {}
  (0..(chunks - 1)).each do |chunk_i|
    chunk_ts = self.class.enc36(last_chunk - chunk_i)
    questats_key = @questats_prefix + chunk_ts
    stats_chunk = redis.hgetall(questats_key)
    next unless stats_chunk
    stats_chunk.each do |key, val|
      queue, stat, slice = key.split(',')
      queue = self.class.strip_prefix(queue, @que_prefix)
      timestamp = self.class.dec36_assemble_timestamp(chunk_ts, slice)
      stats[stat] = {} unless stats[stat]
      stats[stat][timestamp] = {} unless stats[stat][timestamp]
      stats[stat][timestamp][queue] = val.to_i
    end
  end
  stats
end
queues() click to toggle source
# File lib/balsamique.rb, line 353
def queues
  result = redis.hgetall(@queues)
  result.keys.map { |k| self.class.strip_prefix(k, @que_prefix) }
end
queues_info() click to toggle source
# File lib/balsamique.rb, line 358
def queues_info
  qs_info = redis.hgetall(@queues)
  return {} if qs_info.empty?
  now = Time.now.to_f
  details = redis.multi do |r|
    qs_info.keys.each do |key|
      r.zrange(key, 0, 0, withscores: true)
      r.zcount(key, 0, now)
      r.zcard(key)
    end
  end
  result = {}
  qs_info.keys.each_with_index do |key, i|
    i3 = 3 * i
    queue = self.class.strip_prefix(key, @que_prefix)
    last_id, last_ts = qs_info[key].split(',')
    last_ts = last_ts.to_f
    next_id = next_ts = nil
    if (next_info = details[i3].first)
      next_id = next_info.first
      next_ts = next_info.last
    end
    result[queue] = {
      current_ts: now,
      last_id: last_id, last_ts: last_ts,
      total: details[i3 + 2], ready: details[i3 + 1],
      next_id: next_id, next_ts: next_ts
    }
  end
  result
end
rand_delay(delay = RETRY_DELAY) click to toggle source
# File lib/balsamique.rb, line 27
def rand_delay(delay = RETRY_DELAY)
  delay - 0.5 * rand() * delay
end
redis() click to toggle source
# File lib/balsamique.rb, line 31
def redis
  @redis
end
redis_eval(cmd_sha, cmd, keys, argv) click to toggle source
# File lib/balsamique.rb, line 35
def redis_eval(cmd_sha, cmd, keys, argv)
  redis.evalsha(cmd_sha, keys, argv)
rescue Redis::CommandError
  puts "[INFO] Balsamique falling back to EVAL for #{cmd_sha}"
  redis.eval(cmd, keys, argv)
end
remove_job(id) click to toggle source
# File lib/balsamique.rb, line 420
def remove_job(id)
  return unless (status = redis.hget(@status, id))
  queue, timestamps = decode_job_status(status)
  redis.multi do |r|
    if queue.start_with?(@que_prefix)
      r.zrem(queue, id)
      rkey = "#{id},#{queue}"
      r.hdel(@retries, rkey)
      rkeys = []
      timestamps.drop(1).each_with_index do |ts, i|
        rkeys << rkey + ",#{i + 1}"
      end
      if rkeys.size > 0
        r.hdel(@failures, rkeys)
        r.zrem(@failz, rkeys)
      end
    end
    r.hdel(@args, id)
    r.hdel(@tasks, id)
  end
  return unless (check_status = redis.hget(@status, id))
  if check_status == status
    redis.hdel(@status, id)
    if (uid = redis.hget(@unique, id))
      redis.hdel(@unique, [id, uid])
    end
  else
    remove_job(id)
  end
end
reschedule(queue, ids, timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 305
def reschedule(queue, ids, timestamp = Time.now.to_f)
  redis_eval(RESCHEDULE_SHA, RESCHEDULE,
    [@que_prefix + queue], [timestamp] + ids)
end
rm_env(topic, keys = nil) click to toggle source
# File lib/balsamique.rb, line 561
def rm_env(topic, keys = nil)
  hkey = @env_prefix + topic.to_s
  if keys.nil?
    redis.del(hkey)
  elsif !keys.empty?
    redis.hdel(hkey, keys)
  end
end
set_id_floor(id_floor) click to toggle source
# File lib/balsamique.rb, line 89
def set_id_floor(id_floor)
  redis_eval(SET_ID_FLOOR_SHA, SET_ID_FLOOR, [@unique], [id_floor.to_i])
end
succeed(id, tasks, timestamp = Time.now.to_f) click to toggle source
# File lib/balsamique.rb, line 228
def succeed(id, tasks, timestamp = Time.now.to_f)
  current_task = self.class.current_task(tasks)
  next_task = self.class.next_task(tasks)
  keys = [
    @tasks, @status, @retries, @report_queue, @failz, @failures,
    @que_prefix + current_task]
  argv = [id, timestamp, tasks.to_json]
  keys << (@que_prefix + next_task) << @queues if next_task
  id == redis_eval(SUCCEED_TASK_SHA, SUCCEED_TASK, keys, argv)
end