module JobIteration::Iteration

Constants

SIMPLE_SERIALIZABLE_CLASSES

Public Class Methods

new(*arguments) click to toggle source
Calls superclass method
# File lib/job-iteration/iteration.rb, line 69
def initialize(*arguments)
  super
  self.times_interrupted = 0
  self.total_time = 0.0
  assert_implements_methods!
end

Public Instance Methods

deserialize(job_data) click to toggle source
Calls superclass method
# File lib/job-iteration/iteration.rb, line 85
def deserialize(job_data) # @private
  super
  self.cursor_position = job_data["cursor_position"]
  self.times_interrupted = job_data["times_interrupted"] || 0
  self.total_time = job_data["total_time"] || 0
end
perform(*params) click to toggle source
# File lib/job-iteration/iteration.rb, line 92
def perform(*params) # @private
  interruptible_perform(*params)
end
retry_job(*, **) click to toggle source
Calls superclass method
# File lib/job-iteration/iteration.rb, line 96
def retry_job(*, **)
  super unless defined?(@retried) && @retried
  @retried = true
end
serialize() click to toggle source
Calls superclass method
# File lib/job-iteration/iteration.rb, line 77
def serialize # @private
  super.merge(
    "cursor_position" => cursor_position,
    "times_interrupted" => times_interrupted,
    "total_time" => total_time,
  )
end

Private Instance Methods

adjust_total_time() click to toggle source
# File lib/job-iteration/iteration.rb, line 187
def adjust_total_time
  self.total_time += (Time.now.utc.to_f - start_time.to_f).round(6)
end
assert_enumerator!(enum) click to toggle source
# File lib/job-iteration/iteration.rb, line 191
    def assert_enumerator!(enum)
      return if enum.is_a?(Enumerator)

      raise ArgumentError, <<~EOS
        #build_enumerator is expected to return Enumerator object, but returned #{enum.class}.
        Example:
           def build_enumerator(params, cursor:)
            enumerator_builder.active_record_on_records(
              Shop.find(params[:shop_id]).products,
              cursor: cursor
            )
          end
      EOS
    end
assert_implements_methods!() click to toggle source
# File lib/job-iteration/iteration.rb, line 218
def assert_implements_methods!
  unless respond_to?(:each_iteration, true)
    raise(
      ArgumentError,
      "Iteration job (#{self.class}) must implement #each_iteration method"
    )
  end

  if respond_to?(:build_enumerator, true)
    parameters = method_parameters(:build_enumerator)
    unless valid_cursor_parameter?(parameters)
      raise ArgumentError, "Iteration job (#{self.class}) #build_enumerator " \
      "expects the keyword argument `cursor`"
    end
  else
    raise ArgumentError, "Iteration job (#{self.class}) must implement #build_enumerator " \
      "to provide a collection to iterate"
  end
end
assert_valid_cursor!(cursor) click to toggle source

The adapter must be able to serialize and deserialize the cursor back into an equivalent object. github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple

# File lib/job-iteration/iteration.rb, line 208
def assert_valid_cursor!(cursor)
  return if serializable?(cursor)

  raise CursorError.new(
    "Cursor must be composed of objects capable of built-in (de)serialization: " \
    "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
    cursor: cursor,
  )
end
enumerator_builder() click to toggle source
# File lib/job-iteration/iteration.rb, line 103
def enumerator_builder
  JobIteration.enumerator_builder.new(self)
end
interruptible_perform(*arguments) click to toggle source
# File lib/job-iteration/iteration.rb, line 107
def interruptible_perform(*arguments)
  self.start_time = Time.now.utc

  enumerator = nil
  ActiveSupport::Notifications.instrument("build_enumerator.iteration", iteration_instrumentation_tags) do
    enumerator = build_enumerator(*arguments, cursor: cursor_position)
  end

  unless enumerator
    logger.info("[JobIteration::Iteration] `build_enumerator` returned nil. " \
      "Skipping the job.")
    return
  end

  assert_enumerator!(enumerator)

  if executions == 1 && times_interrupted == 0
    run_callbacks(:start)
  else
    ActiveSupport::Notifications.instrument("resumed.iteration", iteration_instrumentation_tags)
  end

  completed = catch(:abort) do
    iterate_with_enumerator(enumerator, arguments)
  end

  run_callbacks(:shutdown)

  if run_complete_callbacks?(completed)
    run_callbacks(:complete)
    output_interrupt_summary
  end
end
iterate_with_enumerator(enumerator, arguments) click to toggle source
# File lib/job-iteration/iteration.rb, line 141
def iterate_with_enumerator(enumerator, arguments)
  arguments = arguments.dup.freeze
  found_record = false
  enumerator.each do |object_from_enumerator, index|
    # Deferred until 2.0.0
    # assert_valid_cursor!(index)

    record_unit_of_work do
      found_record = true
      each_iteration(object_from_enumerator, *arguments)
      self.cursor_position = index
    end

    next unless job_should_exit?
    self.executions -= 1 if executions > 1
    reenqueue_iteration_job
    return false
  end

  logger.info(
    "[JobIteration::Iteration] Enumerator found nothing to iterate! " \
    "times_interrupted=#{times_interrupted} cursor_position=#{cursor_position}"
  ) unless found_record

  adjust_total_time

  true
end
iteration_instrumentation_tags() click to toggle source
# File lib/job-iteration/iteration.rb, line 249
def iteration_instrumentation_tags
  { job_class: self.class.name }
end
job_should_exit?() click to toggle source
Calls superclass method
# File lib/job-iteration/iteration.rb, line 258
def job_should_exit?
  if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
    return true
  end

  JobIteration.interruption_adapter.call || (defined?(super) && super)
end
method_parameters(method_name) click to toggle source
# File lib/job-iteration/iteration.rb, line 238
def method_parameters(method_name)
  method = method(method_name)

  if defined?(T::Private::Methods)
    signature = T::Private::Methods.signature_for_method(method)
    method = signature.method if signature
  end

  method.parameters
end
output_interrupt_summary() click to toggle source
# File lib/job-iteration/iteration.rb, line 253
def output_interrupt_summary
  message = "[JobIteration::Iteration] Completed iterating. times_interrupted=%d total_time=%.3f"
  logger.info(Kernel.format(message, times_interrupted, total_time))
end
record_unit_of_work() { || ... } click to toggle source
# File lib/job-iteration/iteration.rb, line 170
def record_unit_of_work
  ActiveSupport::Notifications.instrument("each_iteration.iteration", iteration_instrumentation_tags) do
    yield
  end
end
reenqueue_iteration_job() click to toggle source
# File lib/job-iteration/iteration.rb, line 176
def reenqueue_iteration_job
  ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
  logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")

  adjust_total_time
  self.times_interrupted += 1

  self.already_in_queue = true if respond_to?(:already_in_queue=)
  retry_job
end
run_complete_callbacks?(completed) click to toggle source
# File lib/job-iteration/iteration.rb, line 266
def run_complete_callbacks?(completed)
  # nil means that someone aborted the job but want to call the on_complete callback
  if completed.nil?
    completed = :finished
  end

  case completed
  when :finished, true then true
  # skip_complete_callbacks is returning from ThrottleEnumeratorand we do not want the on_complete callback to
  # be executed
  when false, :skip_complete_callbacks then false
  end
end
serializable?(object) click to toggle source
# File lib/job-iteration/iteration.rb, line 294
def serializable?(object)
  # Subclasses must be excluded, hence not using is_a? or ===.
  if object.instance_of?(Array)
    object.all? { |element| serializable?(element) }
  elsif object.instance_of?(Hash)
    object.all? { |key, value| serializable?(key) && serializable?(value) }
  else
    SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
  end
rescue NoMethodError
  # BasicObject doesn't respond to instance_of, but we can't serialize it anyway
  false
end
valid_cursor_parameter?(parameters) click to toggle source
# File lib/job-iteration/iteration.rb, line 280
def valid_cursor_parameter?(parameters)
  # this condition is when people use the splat operator.
  # def build_enumerator(*)
  return true if parameters == [[:rest]]

  parameters.each do |parameter_type, parameter_name|
    next unless parameter_name == :cursor
    return true if [:keyreq, :key].include?(parameter_type)
  end
  false
end