class LogStash::Inputs::Redis

Read events from a redis. Supports both redis channels and also redis lists (using BLPOP)

For more information about redis, see <redis.io/>

## `batch_count` note

If you use the 'batch_count' setting, you must use a redis version 2.6.0 or newer. Anything older does not support the operations used by batching.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/redis.rb, line 58
def register
  require 'redis'
  @redis = nil
  @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

  # TODO remove after setting key and data_type to true
  if @queue
    if @key or @data_type
      raise RuntimeError.new(
        "Cannot specify queue parameter and key or data_type"
      )
    end
    @key = @queue
    @data_type = 'list'
  end

  if not @key or not @data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO

  @logger.info("Registering redis", :identity => identity)
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 240
def run(output_queue)
  if @data_type == 'list'
    listener_loop :list_listener, output_queue
  elsif @data_type == 'channel'
    listener_loop :channel_listener, output_queue
  else
    listener_loop :pattern_channel_listener, output_queue
  end
end
teardown() click to toggle source
# File lib/logstash/inputs/redis.rb, line 251
def teardown
  if @data_type == 'channel' and @redis
    @redis.unsubscribe
    @redis.quit
    @redis = nil
  end
  if @data_type == 'pattern_channel' and @redis
    @redis.punsubscribe
    @redis.quit
    @redis = nil
  end
end

Private Instance Methods

channel_listener(redis, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 186
def channel_listener(redis, output_queue)
  redis.subscribe @key do |on|
    on.subscribe do |channel, count|
      @logger.info("Subscribed", :channel => channel, :count => count)
    end

    on.message do |channel, message|
      queue_event message, output_queue
    end

    on.unsubscribe do |channel, count|
      @logger.info("Unsubscribed", :channel => channel, :count => count)
    end
  end
end
connect() click to toggle source
# File lib/logstash/inputs/redis.rb, line 93
def connect
  redis = Redis.new(
    :host => @host,
    :port => @port,
    :timeout => @timeout,
    :db => @db,
    :password => @password.nil? ? nil : @password.value
  )
  load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)
  return redis
end
identity() click to toggle source

A string used to identify a redis instance in log messages TODO(sissel): Use instance variables for this once the @name config option is removed.

# File lib/logstash/inputs/redis.rb, line 88
def identity
  @name || "#{@redis_url} #{@data_type}:#{@key}"
end
list_listener(redis, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 141
def list_listener(redis, output_queue)

  # blpop returns the 'key' read from as well as the item result
  # we only care about the result (2nd item in the list).
  item = redis.blpop(@key, 0)[1]

  # blpop failed or .. something?
  # TODO(sissel): handle the error
  return if item.nil?
  queue_event(item, output_queue)

  # If @batch_count is 1, there's no need to continue.
  return if @batch_count == 1

  begin
    redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|
      queue_event(item, output_queue)
    end

    # Below is a commented-out implementation of 'batch fetch'
    # using pipelined LPOP calls. This in practice has been observed to
    # perform exactly the same in terms of event throughput as
    # the evalsha method. Given that the EVALSHA implementation uses
    # one call to redis instead of N (where N == @batch_count) calls,
    # I decided to go with the 'evalsha' method of fetching N items
    # from redis in bulk.
    #redis.pipelined do
      #error, item = redis.lpop(@key)
      #(@batch_count-1).times { redis.lpop(@key) }
    #end.each do |item|
      #queue_event(item, output_queue) if item
    #end
    # --- End commented out implementation of 'batch fetch'
  rescue Redis::CommandError => e
    if e.to_s =~ /NOSCRIPT/ then
      @logger.warn("Redis may have been restarted, reloading redis batch EVAL script", :exception => e);
      load_batch_script(redis)
      retry
    else
      raise e
    end
  end
end
listener_loop(listener, output_queue) click to toggle source

Since both listeners have the same basic loop, we've abstracted the outer loop.

# File lib/logstash/inputs/redis.rb, line 222
def listener_loop(listener, output_queue)
  while !finished?
    begin
      @redis ||= connect
      self.send listener, @redis, output_queue
    rescue Redis::CannotConnectError => e
      @logger.warn("Redis connection problem", :exception => e)
      sleep 1
      @redis = connect
    rescue => e # redis error
      @logger.warn("Failed to get event from redis", :name => @name,
                   :exception => e, :backtrace => e.backtrace)
      raise e
    end
  end # while !finished?
end
load_batch_script(redis) click to toggle source
# File lib/logstash/inputs/redis.rb, line 106
  def load_batch_script(redis)
    #A redis lua EVAL script to fetch a count of keys
    #in case count is bigger than current items in queue whole queue will be returned without extra nil values
    redis_script = <<EOF
          local i = tonumber(ARGV[1])
          local res = {}
          local length = redis.call('llen',KEYS[1])
          if length < i then i = length end
          while (i > 0) do
            local item = redis.call("lpop", KEYS[1])
            if (not item) then
              break
            end
            table.insert(res, item)
            i = i-1
          end
          return res
EOF
    @redis_script_sha = redis.script(:load, redis_script)
  end
pattern_channel_listener(redis, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 203
def pattern_channel_listener(redis, output_queue)
  redis.psubscribe @key do |on|
    on.psubscribe do |channel, count|
      @logger.info("Subscribed", :channel => channel, :count => count)
    end

    on.pmessage do |ch, event, message|
      queue_event message, output_queue
    end

    on.punsubscribe do |channel, count|
      @logger.info("Unsubscribed", :channel => channel, :count => count)
    end
  end
end
queue_event(msg, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 128
def queue_event(msg, output_queue)
  begin
    @codec.decode(msg) do |event|
      decorate(event)
      output_queue << event
    end
  rescue => e # parse or event creation error
    @logger.error("Failed to create event", :message => msg, :exception => e,
                  :backtrace => e.backtrace);
  end
end