class Sidekiq::Job::Setter
This helper class encapsulates the set options for ‘set`, e.g.
SomeJob.set(queue: 'foo').perform_async(....)
Public Class Methods
Source
# File lib/sidekiq/job.rb, line 187 def initialize(klass, opts) @klass = klass # NB: the internal hash always has stringified keys @opts = opts.transform_keys(&:to_s) # ActiveJob compatibility interval = @opts.delete("wait_until") || @opts.delete("wait") at(interval) if interval end
Public Instance Methods
Source
# File lib/sidekiq/job.rb, line 205 def perform_async(*args) if @opts["sync"] == true perform_inline(*args) else @klass.client_push(@opts.merge("args" => args, "class" => @klass)) end end
Source
# File lib/sidekiq/job.rb, line 251 def perform_bulk(args, batch_size: 1_000) client = @klass.build_client client.push_bulk(@opts.merge("class" => @klass, "args" => args, :batch_size => batch_size)) end
Source
# File lib/sidekiq/job.rb, line 258 def perform_in(interval, *args) at(interval).perform_async(*args) end
interval
must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
Also aliased as: perform_at
Source
# File lib/sidekiq/job.rb, line 215 def perform_inline(*args) raw = @opts.merge("args" => args, "class" => @klass) # validate and normalize payload item = normalize_item(raw) queue = item["queue"] # run client-side middleware cfg = Sidekiq.default_configuration result = cfg.client_middleware.invoke(item["class"], item, queue, cfg.redis_pool) do item end return nil unless result # round-trip the payload via JSON msg = Sidekiq.load_json(Sidekiq.dump_json(item)) # prepare the job instance klass = Object.const_get(msg["class"]) job = klass.new job.jid = msg["jid"] job.bid = msg["bid"] if job.respond_to?(:bid) # run the job through server-side middleware result = cfg.server_middleware.invoke(job, msg, msg["queue"]) do # perform it job.perform(*msg["args"]) true end return nil unless result # jobs do not return a result. they should store any # modified state. true end
Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.
Also aliased as: perform_sync
Source
# File lib/sidekiq/job.rb, line 197 def set(options) hash = options.transform_keys(&:to_s) interval = hash.delete("wait_until") || @opts.delete("wait") @opts.merge!(hash) at(interval) if interval self end
Private Instance Methods
Source
# File lib/sidekiq/job.rb, line 265 def at(interval) int = interval.to_f now = Time.now.to_f ts = ((int < 1_000_000_000) ? now + int : int) # Optimization to enqueue something now that is scheduled to go out now or in the past @opts["at"] = ts if ts > now self end