class LogStash::Outputs::Redis

send events to a redis database using RPUSH

For more information about redis, see <redis.io/>

Public Instance Methods

congestion_check(key) click to toggle source
# File lib/logstash/outputs/redis.rb, line 177
def congestion_check(key)
  return if @congestion_threshold == 0
  if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
    while @redis.llen(key) > @congestion_threshold # Don't push event to redis key which has reached @congestion_threshold.
      @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
      sleep @congestion_interval
    end
    @congestion_check_time = Time.now.to_i
  end
end
flush(events, key, teardown=false) click to toggle source

called from Stud::Buffer#buffer_flush when there are events to flush

# File lib/logstash/outputs/redis.rb, line 189
def flush(events, key, teardown=false)
  @redis ||= connect
  # we should not block due to congestion on teardown
  # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
  congestion_check(key) unless teardown
  @redis.rpush(key, events)
end
on_flush_error(e) click to toggle source

called from Stud::Buffer#buffer_flush when an error occurs

# File lib/logstash/outputs/redis.rb, line 197
def on_flush_error(e)
  @logger.warn("Failed to send backlog of events to redis",
    :identity => identity,
    :exception => e,
    :backtrace => e.backtrace
  )
  @redis = connect
end
receive(event) click to toggle source
# File lib/logstash/outputs/redis.rb, line 136
def receive(event)
  return unless output?(event)

  if @batch and @data_type == 'list' # Don't use batched method for pubsub.
    # Stud::Buffer
    buffer_receive(event.to_json, event.sprintf(@key))
    return
  end

  key = event.sprintf(@key)
  # TODO(sissel): We really should not drop an event, but historically
  # we have dropped events that fail to be converted to json.
  # TODO(sissel): Find a way to continue passing events through even
  # if they fail to convert properly.
  begin
    payload = event.to_json
  rescue Encoding::UndefinedConversionError, ArgumentError
    puts "FAILUREENCODING"
    @logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?",
                  :event => event.inspect)
    return
  end

  begin
    @redis ||= connect
    if @data_type == 'list'
      congestion_check(key)
      @redis.rpush(key, payload)
    else
      @redis.publish(key, payload)
    end
  rescue => e
    @logger.warn("Failed to send event to redis", :event => event,
                 :identity => identity, :exception => e,
                 :backtrace => e.backtrace)
    sleep @reconnect_interval
    @redis = nil
    retry
  end
end
register() click to toggle source
# File lib/logstash/outputs/redis.rb, line 92
def register
  require 'redis'

  # TODO remove after setting key and data_type to true
  if @queue
    if @key or @data_type
      raise RuntimeError.new(
        "Cannot specify queue parameter and key or data_type"
      )
    end
    @key = @queue
    @data_type = 'list'
  end

  if not @key or not @data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO


  if @batch
    if @data_type != "list"
      raise RuntimeError.new(
        "batch is not supported with data_type #{@data_type}"
      )
    end
    buffer_initialize(
      :max_items => @batch_events,
      :max_interval => @batch_timeout,
      :logger => @logger
    )
  end

  @redis = nil
  if @shuffle_hosts
      @host.shuffle!
  end
  @host_idx = 0

  @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }
end
teardown() click to toggle source
# File lib/logstash/outputs/redis.rb, line 206
def teardown
  if @batch
    buffer_flush(:final => true)
  end
  if @data_type == 'channel' and @redis
    @redis.quit
    @redis = nil
  end
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/outputs/redis.rb, line 217
def connect
  @current_host, @current_port = @host[@host_idx].split(':')
  @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1

  if not @current_port
    @current_port = @port
  end

  params = {
    :host => @current_host,
    :port => @current_port,
    :timeout => @timeout,
    :db => @db
  }
  @logger.debug(params)

  if @password
    params[:password] = @password.value
  end

  Redis.new(params)
end
identity() click to toggle source

A string used to identify a redis instance in log messages

# File lib/logstash/outputs/redis.rb, line 241
def identity
  @name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end