class Postqueue::Queue
The Postqueue
processor processes items in a single Postqueue
table.
The Postqueue
processor processes items in a single Postqueue
table.
The Postqueue
processor processes items in a single Postqueue
table.
Constants
- VALID_PROCESSING_VALUES
Attributes
The default batch size. Will be used if no specific batch size is defined for an operation.
The AR::Base class to use. You would only change this if you want to run the queue in a different database or in a different table.
maximum number of processing attempts.
Public Class Methods
# File lib/postqueue/queue.rb, line 27 def initialize(&block) @batch_sizes = {} @item_class = ::Postqueue::Item @default_batch_size = 1 @max_attemps = 5 @idempotent_operations = {} @batch_sizes = {} @processing = :async on "test" do |_op, entity_ids| Postqueue.logger.info "[test] processing entity_ids: #{entity_ids.inspect}" end on "fail" do |_op, entity_ids| raise "Postqueue test failure, w/entity_ids: #{entity_ids.inspect}" end on :missing_handler do |op, entity_ids| raise MissingHandler, queue: self, op: op, entity_ids: entity_ids end set_default_callbacks yield self if block end
Public Instance Methods
# File lib/postqueue/queue/callback.rb, line 17 def assert_valid_op!(op) return if op == :missing_handler return if op.is_a?(String) raise ArgumentError, "Invalid op #{op.inspect}, must be a string" end
# File lib/postqueue/queue.rb, line 71 def batch_size(op:) @batch_sizes[op] || default_batch_size || 1 end
# File lib/postqueue/queue/select_and_lock.rb, line 60 def calculate_batch_size(op:, max_batch_size:) recommended_batch_size = batch_size(op: op) return 1 if recommended_batch_size < 2 return recommended_batch_size unless max_batch_size max_batch_size < recommended_batch_size ? max_batch_size : recommended_batch_size end
# File lib/postqueue/queue.rb, line 79 def enqueue(op:, entity_id:) enqueued_items = item_class.enqueue op: op, entity_id: entity_id, ignore_duplicates: idempotent_operation?(op) return enqueued_items unless enqueued_items > 0 case processing when :async :nop when :sync process_until_empty(op: op) when :verify raise(MissingHandler, queue: self, op: op, entity_ids: [entity_id]) unless callback_for(op: op) end enqueued_items end
# File lib/postqueue/queue.rb, line 75 def idempotent_operation?(op) @idempotent_operations.fetch(op) { @idempotent_operations.fetch("*", false) } end
# File lib/postqueue/queue/logging.rb, line 34 def logger Postqueue.logger end
# File lib/postqueue/queue/callback.rb, line 24 def on(op, batch_size: nil, idempotent: nil, &block) assert_valid_op! op callbacks[op] = block if batch_size raise ArgumentError, "Can't set per-op batchsize for op '*'" if op == "*" @batch_sizes[op] = batch_size end unless idempotent.nil? raise ArgumentError, "Can't idempotent for default op '*'" if op == "*" @idempotent_operations[op] = idempotent end self end
Processes up to batch_size
entries
process batch_size
: 100
# File lib/postqueue/queue/processing.rb, line 7 def process(op: nil, batch_size: nil) item_class.transaction do process_inside_transaction(op: op, batch_size: batch_size) end end
processes a single entry
# File lib/postqueue/queue/processing.rb, line 14 def process_one(op: nil) process(op: op, batch_size: 1) end
# File lib/postqueue/queue/processing.rb, line 18 def process_until_empty(op: nil, batch_size: nil) count = 0 loop do processed_items = process(op: op, batch_size: batch_size) break if processed_items == 0 count += processed_items end count end
sets or return the processing mode. This must be one of :async, :sync or :verify
# File lib/postqueue/queue.rb, line 18 def processing(processing = nil) return @processing if processing.nil? unless VALID_PROCESSING_VALUES.include?(processing) raise ArgumentError, "Invalid processing value, must be one of #{VALID_PROCESSING_VALUES.inspect}" end @processing = processing end
# File lib/postqueue/queue/runner.rb, line 4 def run(&block) @run = block if block @run end
# File lib/postqueue/queue/runner.rb, line 9 def run! set_default_runner unless @run @run.call(self) end
Select and lock up to a limit unlocked items in the queue. Used by select_and_lock_batch.
# File lib/postqueue/queue/select_and_lock.rb, line 15 def select_and_lock(relation, limit:) relation = upcoming(relation) # FOR UPDATE SKIP LOCKED selects and locks entries, but skips those that # are already locked - preventing this transaction from being locked. sql = relation.to_sql + " FOR UPDATE SKIP LOCKED" sql += " LIMIT #{limit}" if limit item_class.find_by_sql(sql) end
returns a batch of queue items for processing. These queue items are choosen depending on the passed in op: and batch_size
: settings (if any).
All selected queue items will have the same op value. If an op: value is passed in, that one is chosen as a filter condition, otherwise the op value of the first queue entry is used insteatd.
This method will at maximum select and lock a batch_size
items. If the a batch_size
configured in the queue is smaller than the value passed in here that one is used instead.
Returns an array of item objects.
# File lib/postqueue/queue/select_and_lock.rb, line 38 def select_and_lock_batch(op:, max_batch_size:) relation = item_class.all relation = relation.where(op: op) if op match = select_and_lock(relation, limit: 1).first return [] unless match batch_size = calculate_batch_size(op: match.op, max_batch_size: max_batch_size) return [ match ] if batch_size <= 1 batch_relation = relation.where(op: match.op) select_and_lock(batch_relation, limit: batch_size) end
# File lib/postqueue/queue/select_and_lock.rb, line 52 def select_and_lock_duplicates(op:, entity_ids:) raise ArgumentError, "Missing op argument" unless op return [] if entity_ids.empty? relation = item_class.where(op: op, entity_id: entity_ids) select_and_lock(relation, limit: nil) end
# File lib/postqueue/queue.rb, line 53 def set_default_callbacks on_exception do |e, _, _| e.send :raise end after_processing do |op, entity_ids, timing| processing_time = timing.processing_time avg_queue_time = timing.avg_queue_time max_queue_time = timing.max_queue_time msg = "processing '#{op}' for id(s) #{entity_ids.join(',')}: " msg += "processing #{entity_ids.length} items took #{'%.3f secs' % processing_time}" msg += ", queue_time: #{'%.3f secs (avg)' % avg_queue_time}/#{'%.3f secs (max)' % max_queue_time}" Postqueue.logger.info msg end end
# File lib/postqueue/queue/runner.rb, line 14 def set_default_runner run do |queue| loop do queue.logger.debug "#{queue}: Processing until empty" queue.process_until_empty queue.logger.debug "#{queue}: sleeping" sleep 1 end end end
# File lib/postqueue/queue/logging.rb, line 38 def to_s item_class.table_name end
Private Instance Methods
# File lib/postqueue/queue/logging.rb, line 6 def after_processing(&block) if block @after_processing = block if block.arity > -3 && block.arity != 3 raise ArgumentError, "Invalid after_processing block: must accept 3 arguments" end end @after_processing end
# File lib/postqueue/queue/callback.rb, line 47 def callback_for(op:) callbacks[op] || callbacks["*"] end
# File lib/postqueue/queue/callback.rb, line 43 def callbacks @callbacks ||= {} end
# File lib/postqueue/queue/logging.rb, line 17 def log_exception(exception, op, entity_ids) logger.warn "processing '#{op}' for id(s) #{entity_ids.inspect}: caught #{exception}" end
# File lib/postqueue/queue/logging.rb, line 21 def on_exception(&block) if block @on_exception = block if block.arity > -3 && block.arity != 3 raise ArgumentError, "Invalid on_exception block: must accept 3 arguments" end end @on_exception end
The actual processing. Returns [ op, [ ids-of-processed-items ] ] or nil
# File lib/postqueue/queue/processing.rb, line 31 def process_inside_transaction(op:, batch_size:) items = select_and_lock_batch(op: op, max_batch_size: batch_size) match = items.first return 0 unless match entity_ids = items.map(&:entity_id) timing = run_callback(op: match.op, entity_ids: entity_ids) after_processing.call(match.op, entity_ids, timing) item_class.where(id: items.map(&:id)).delete_all # even though we try not to enqueue duplicates we cannot guarantee that, # since concurrent enqueue transactions might still insert duplicates. # That's why we explicitely remove all non-failed duplicates here. if idempotent_operation?(match.op) duplicates = select_and_lock_duplicates(op: match.op, entity_ids: entity_ids) item_class.where(id: duplicates.map(&:id)).delete_all unless duplicates.empty? end entity_ids.length rescue => e item_class.postpone items.map(&:id) log_exception(e, match.op, entity_ids) on_exception.call(e, match.op, entity_ids) 0 end
# File lib/postqueue/queue/callback.rb, line 51 def run_callback(op:, entity_ids:) queue_times = item_class.find_by_sql <<-SQL SELECT extract('epoch' from AVG(now() - created_at)) AS avg, extract('epoch' from MAX(now() - created_at)) AS max FROM #{item_class.table_name} WHERE entity_id IN (#{entity_ids.join(',')}) SQL queue_time = queue_times.first processing_time = Benchmark.realtime do callback = callback_for(op: op) || callbacks.fetch(:missing_handler) callback.call(op, entity_ids) end Timing.new(queue_time.avg, queue_time.max, processing_time) end