class Postqueue::Queue

The Postqueue processor processes items in a single Postqueue table.

The Postqueue processor processes items in a single Postqueue table.

The Postqueue processor processes items in a single Postqueue table.

Constants

VALID_PROCESSING_VALUES

Attributes

default_batch_size[RW]

The default batch size. Will be used if no specific batch size is defined for an operation.

item_class[RW]

The AR::Base class to use. You would only change this if you want to run the queue in a different database or in a different table.

max_attemps[R]

maximum number of processing attempts.

Public Class Methods

new() { |self| ... } click to toggle source
# File lib/postqueue/queue.rb, line 27
def initialize(&block)
  @batch_sizes = {}
  @item_class = ::Postqueue::Item
  @default_batch_size = 1
  @max_attemps = 5
  @idempotent_operations = {}
  @batch_sizes = {}
  @processing = :async

  on "test" do |_op, entity_ids|
    Postqueue.logger.info "[test] processing entity_ids: #{entity_ids.inspect}"
  end

  on "fail" do |_op, entity_ids|
    raise "Postqueue test failure, w/entity_ids: #{entity_ids.inspect}"
  end

  on :missing_handler do |op, entity_ids|
    raise MissingHandler, queue: self, op: op, entity_ids: entity_ids
  end

  set_default_callbacks

  yield self if block
end

Public Instance Methods

assert_valid_op!(op) click to toggle source
# File lib/postqueue/queue/callback.rb, line 17
def assert_valid_op!(op)
  return if op == :missing_handler
  return if op.is_a?(String)

  raise ArgumentError, "Invalid op #{op.inspect}, must be a string"
end
batch_size(op:) click to toggle source
# File lib/postqueue/queue.rb, line 71
def batch_size(op:)
  @batch_sizes[op] || default_batch_size || 1
end
calculate_batch_size(op:, max_batch_size:) click to toggle source
# File lib/postqueue/queue/select_and_lock.rb, line 60
def calculate_batch_size(op:, max_batch_size:)
  recommended_batch_size = batch_size(op: op)
  return 1 if recommended_batch_size < 2
  return recommended_batch_size unless max_batch_size
  max_batch_size < recommended_batch_size ? max_batch_size : recommended_batch_size
end
enqueue(op:, entity_id:) click to toggle source
# File lib/postqueue/queue.rb, line 79
def enqueue(op:, entity_id:)
  enqueued_items = item_class.enqueue op: op, entity_id: entity_id, ignore_duplicates: idempotent_operation?(op)
  return enqueued_items unless enqueued_items > 0

  case processing
  when :async
    :nop
  when :sync
    process_until_empty(op: op)
  when :verify
    raise(MissingHandler, queue: self, op: op, entity_ids: [entity_id]) unless callback_for(op: op)
  end

  enqueued_items
end
idempotent_operation?(op) click to toggle source
# File lib/postqueue/queue.rb, line 75
def idempotent_operation?(op)
  @idempotent_operations.fetch(op) { @idempotent_operations.fetch("*", false) }
end
logger() click to toggle source
# File lib/postqueue/queue/logging.rb, line 34
def logger
  Postqueue.logger
end
on(op, batch_size: nil, idempotent: nil, &block) click to toggle source
# File lib/postqueue/queue/callback.rb, line 24
def on(op, batch_size: nil, idempotent: nil, &block)
  assert_valid_op! op
  callbacks[op] = block

  if batch_size
    raise ArgumentError, "Can't set per-op batchsize for op '*'" if op == "*"
    @batch_sizes[op] = batch_size
  end

  unless idempotent.nil?
    raise ArgumentError, "Can't idempotent for default op '*'" if op == "*"
    @idempotent_operations[op] = idempotent
  end

  self
end
process(op: nil, batch_size: nil) click to toggle source

Processes up to batch_size entries

process batch_size: 100

# File lib/postqueue/queue/processing.rb, line 7
def process(op: nil, batch_size: nil)
  item_class.transaction do
    process_inside_transaction(op: op, batch_size: batch_size)
  end
end
process_one(op: nil) click to toggle source

processes a single entry

# File lib/postqueue/queue/processing.rb, line 14
def process_one(op: nil)
  process(op: op, batch_size: 1)
end
process_until_empty(op: nil, batch_size: nil) click to toggle source
# File lib/postqueue/queue/processing.rb, line 18
def process_until_empty(op: nil, batch_size: nil)
  count = 0
  loop do
    processed_items = process(op: op, batch_size: batch_size)
    break if processed_items == 0
    count += processed_items
  end
  count
end
processing(processing = nil) click to toggle source

sets or return the processing mode. This must be one of :async, :sync or :verify

# File lib/postqueue/queue.rb, line 18
def processing(processing = nil)
  return @processing if processing.nil?

  unless VALID_PROCESSING_VALUES.include?(processing)
    raise ArgumentError, "Invalid processing value, must be one of #{VALID_PROCESSING_VALUES.inspect}"
  end
  @processing = processing
end
run(&block) click to toggle source
# File lib/postqueue/queue/runner.rb, line 4
def run(&block)
  @run = block if block
  @run
end
run!() click to toggle source
# File lib/postqueue/queue/runner.rb, line 9
def run!
  set_default_runner unless @run
  @run.call(self)
end
select_and_lock(relation, limit:) click to toggle source

Select and lock up to a limit unlocked items in the queue. Used by select_and_lock_batch.

# File lib/postqueue/queue/select_and_lock.rb, line 15
def select_and_lock(relation, limit:)
  relation = upcoming(relation)

  # FOR UPDATE SKIP LOCKED selects and locks entries, but skips those that
  # are already locked - preventing this transaction from being locked.
  sql = relation.to_sql + " FOR UPDATE SKIP LOCKED"
  sql += " LIMIT #{limit}" if limit

  item_class.find_by_sql(sql)
end
select_and_lock_batch(op:, max_batch_size:) click to toggle source

returns a batch of queue items for processing. These queue items are choosen depending on the passed in op: and batch_size: settings (if any).

All selected queue items will have the same op value. If an op: value is passed in, that one is chosen as a filter condition, otherwise the op value of the first queue entry is used insteatd.

This method will at maximum select and lock a batch_size items. If the a batch_size configured in the queue is smaller than the value passed in here that one is used instead.

Returns an array of item objects.

# File lib/postqueue/queue/select_and_lock.rb, line 38
def select_and_lock_batch(op:, max_batch_size:)
  relation = item_class.all
  relation = relation.where(op: op) if op

  match = select_and_lock(relation, limit: 1).first
  return [] unless match

  batch_size = calculate_batch_size(op: match.op, max_batch_size: max_batch_size)
  return [ match ] if batch_size <= 1

  batch_relation = relation.where(op: match.op)
  select_and_lock(batch_relation, limit: batch_size)
end
select_and_lock_duplicates(op:, entity_ids:) click to toggle source
# File lib/postqueue/queue/select_and_lock.rb, line 52
def select_and_lock_duplicates(op:, entity_ids:)
  raise ArgumentError, "Missing op argument" unless op
  return [] if entity_ids.empty?

  relation = item_class.where(op: op, entity_id: entity_ids)
  select_and_lock(relation, limit: nil)
end
set_default_callbacks() click to toggle source
# File lib/postqueue/queue.rb, line 53
def set_default_callbacks
  on_exception do |e, _, _|
    e.send :raise
  end

  after_processing do |op, entity_ids, timing|
    processing_time = timing.processing_time
    avg_queue_time  = timing.avg_queue_time
    max_queue_time  = timing.max_queue_time

    msg = "processing '#{op}' for id(s) #{entity_ids.join(',')}: "
    msg += "processing #{entity_ids.length} items took #{'%.3f secs' % processing_time}"
    msg += ", queue_time: #{'%.3f secs (avg)' % avg_queue_time}/#{'%.3f secs (max)' % max_queue_time}"

    Postqueue.logger.info msg
  end
end
set_default_runner() click to toggle source
# File lib/postqueue/queue/runner.rb, line 14
def set_default_runner
  run do |queue|
    loop do
      queue.logger.debug "#{queue}: Processing until empty"
      queue.process_until_empty
      queue.logger.debug "#{queue}: sleeping"
      sleep 1
    end
  end
end
to_s() click to toggle source
# File lib/postqueue/queue/logging.rb, line 38
def to_s
  item_class.table_name
end

Private Instance Methods

after_processing(&block) click to toggle source
# File lib/postqueue/queue/logging.rb, line 6
def after_processing(&block)
  if block
    @after_processing = block
    if block.arity > -3 && block.arity != 3
      raise ArgumentError, "Invalid after_processing block: must accept 3 arguments"
    end
  end

  @after_processing
end
callback_for(op:) click to toggle source
# File lib/postqueue/queue/callback.rb, line 47
def callback_for(op:)
  callbacks[op] || callbacks["*"]
end
callbacks() click to toggle source
# File lib/postqueue/queue/callback.rb, line 43
def callbacks
  @callbacks ||= {}
end
log_exception(exception, op, entity_ids) click to toggle source
# File lib/postqueue/queue/logging.rb, line 17
def log_exception(exception, op, entity_ids)
  logger.warn "processing '#{op}' for id(s) #{entity_ids.inspect}: caught #{exception}"
end
on_exception(&block) click to toggle source
# File lib/postqueue/queue/logging.rb, line 21
def on_exception(&block)
  if block
    @on_exception = block
    if block.arity > -3 && block.arity != 3
      raise ArgumentError, "Invalid on_exception block: must accept 3 arguments"
    end
  end

  @on_exception
end
process_inside_transaction(op:, batch_size:) click to toggle source

The actual processing. Returns [ op, [ ids-of-processed-items ] ] or nil

# File lib/postqueue/queue/processing.rb, line 31
def process_inside_transaction(op:, batch_size:)
  items = select_and_lock_batch(op: op, max_batch_size: batch_size)
  match = items.first
  return 0 unless match

  entity_ids = items.map(&:entity_id)
  timing = run_callback(op: match.op, entity_ids: entity_ids)

  after_processing.call(match.op, entity_ids, timing)
  item_class.where(id: items.map(&:id)).delete_all

  # even though we try not to enqueue duplicates we cannot guarantee that,
  # since concurrent enqueue transactions might still insert duplicates.
  # That's why we explicitely remove all non-failed duplicates here.
  if idempotent_operation?(match.op)
    duplicates = select_and_lock_duplicates(op: match.op, entity_ids: entity_ids)
    item_class.where(id: duplicates.map(&:id)).delete_all unless duplicates.empty?
  end

  entity_ids.length
rescue => e
  item_class.postpone items.map(&:id)
  log_exception(e, match.op, entity_ids)
  on_exception.call(e, match.op, entity_ids)
  0
end
run_callback(op:, entity_ids:) click to toggle source
# File lib/postqueue/queue/callback.rb, line 51
    def run_callback(op:, entity_ids:)
      queue_times = item_class.find_by_sql <<-SQL
        SELECT extract('epoch' from AVG(now() - created_at)) AS avg,
               extract('epoch' from MAX(now() - created_at)) AS max
        FROM #{item_class.table_name} WHERE entity_id IN (#{entity_ids.join(',')})
      SQL
      queue_time = queue_times.first

      processing_time = Benchmark.realtime do
        callback = callback_for(op: op) || callbacks.fetch(:missing_handler)
        callback.call(op, entity_ids)
      end

      Timing.new(queue_time.avg, queue_time.max, processing_time)
    end