class Deimos::Consumer

Basic consumer class. Inherit from this class and override either consume or consume_batch, depending on the delivery mode of your listener. `consume` -> use `delivery :message` or `delivery :batch` `consume_batch` -> use `delivery :inline_batch`

Public Class Methods

decoder() click to toggle source

@return [Deimos::SchemaBackends::Base]

# File lib/deimos/consumer.rb, line 21
def decoder
  @decoder ||= Deimos.schema_backend(schema: config[:schema],
                                     namespace: config[:namespace])
end
key_decoder() click to toggle source

@return [Deimos::SchemaBackends::Base]

# File lib/deimos/consumer.rb, line 27
def key_decoder
  @key_decoder ||= Deimos.schema_backend(schema: config[:key_schema],
                                         namespace: config[:namespace])
end

Public Instance Methods

decode_key(key) click to toggle source

Helper method to decode an encoded key. @param key [String] @return [Object] the decoded key.

# File lib/deimos/consumer.rb, line 36
def decode_key(key)
  return nil if key.nil?

  config = self.class.config
  unless config[:key_configured]
    raise 'No key config given - if you are not decoding keys, please use '\
      '`key_config plain: true`'
  end

  if config[:key_field]
    self.class.decoder.decode_key(key, config[:key_field])
  elsif config[:key_schema]
    self.class.key_decoder.decode(key, schema: config[:key_schema])
  else # no encoding
    key
  end
end

Private Instance Methods

_error(exception, payload, metadata) click to toggle source

@param exception [Exception] @param payload [Hash] @param metadata [Hash]

# File lib/deimos/consumer.rb, line 96
def _error(exception, payload, metadata)
  Deimos.config.tracer&.set_error(@span, exception)

  raise if Deimos.config.consumers.reraise_errors ||
           Deimos.config.consumers.fatal_error&.call(exception, payload, metadata) ||
           fatal_error?(exception, payload, metadata)
end
_report_time_delayed(payload, metadata) click to toggle source
# File lib/deimos/consumer.rb, line 66
def _report_time_delayed(payload, metadata)
  return if payload.nil? || payload['timestamp'].blank?

  begin
    time_delayed = Time.now.in_time_zone - payload['timestamp'].to_datetime
  rescue ArgumentError
    Deimos.config.logger.info(
      message: "Error parsing timestamp! #{payload['timestamp']}"
    )
    return
  end
  Deimos.config.metrics&.histogram('handler', time_delayed, tags: %W(
                                     time:time_delayed
                                     topic:#{metadata[:topic]}
                                   ))
end
_with_span() { || ... } click to toggle source
# File lib/deimos/consumer.rb, line 56
def _with_span
  @span = Deimos.config.tracer&.start(
    'deimos-consumer',
    resource: self.class.name.gsub('::', '-')
  )
  yield
ensure
  Deimos.config.tracer&.finish(@span)
end
fatal_error?(_error, _payload, _metadata) click to toggle source

Overrideable method to determine if a given error should be considered “fatal” and always be reraised. @param _error [Exception] @param _payload [Hash] @param _metadata [Hash] @return [Boolean]

# File lib/deimos/consumer.rb, line 89
def fatal_error?(_error, _payload, _metadata)
  false
end