class LogStash::Inputs::Redis
Read events from a redis. Supports both redis channels and also redis lists (using BLPOP)
For more information about redis, see <redis.io/>
## `batch_count` note
If you use the 'batch_count' setting, you must use a redis version 2.6.0 or newer. Anything older does not support the operations used by batching.
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 58 def register require 'redis' @redis = nil @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}" # TODO remove after setting key and data_type to true if @queue if @key or @data_type raise RuntimeError.new( "Cannot specify queue parameter and key or data_type" ) end @key = @queue @data_type = 'list' end if not @key or not @data_type raise RuntimeError.new( "Must define queue, or key and data_type parameters" ) end # end TODO @logger.info("Registering redis", :identity => identity) end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 240 def run(output_queue) if @data_type == 'list' listener_loop :list_listener, output_queue elsif @data_type == 'channel' listener_loop :channel_listener, output_queue else listener_loop :pattern_channel_listener, output_queue end end
teardown()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 251 def teardown if @data_type == 'channel' and @redis @redis.unsubscribe @redis.quit @redis = nil end if @data_type == 'pattern_channel' and @redis @redis.punsubscribe @redis.quit @redis = nil end end
Private Instance Methods
channel_listener(redis, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 186 def channel_listener(redis, output_queue) redis.subscribe @key do |on| on.subscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.message do |channel, message| queue_event message, output_queue end on.unsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end
connect()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 93 def connect redis = Redis.new( :host => @host, :port => @port, :timeout => @timeout, :db => @db, :password => @password.nil? ? nil : @password.value ) load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1) return redis end
identity()
click to toggle source
A string used to identify a redis instance in log messages TODO(sissel): Use instance variables for this once the @name config option is removed.
# File lib/logstash/inputs/redis.rb, line 88 def identity @name || "#{@redis_url} #{@data_type}:#{@key}" end
list_listener(redis, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 141 def list_listener(redis, output_queue) # blpop returns the 'key' read from as well as the item result # we only care about the result (2nd item in the list). item = redis.blpop(@key, 0)[1] # blpop failed or .. something? # TODO(sissel): handle the error return if item.nil? queue_event(item, output_queue) # If @batch_count is 1, there's no need to continue. return if @batch_count == 1 begin redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item| queue_event(item, output_queue) end # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to # perform exactly the same in terms of event throughput as # the evalsha method. Given that the EVALSHA implementation uses # one call to redis instead of N (where N == @batch_count) calls, # I decided to go with the 'evalsha' method of fetching N items # from redis in bulk. #redis.pipelined do #error, item = redis.lpop(@key) #(@batch_count-1).times { redis.lpop(@key) } #end.each do |item| #queue_event(item, output_queue) if item #end # --- End commented out implementation of 'batch fetch' rescue Redis::CommandError => e if e.to_s =~ /NOSCRIPT/ then @logger.warn("Redis may have been restarted, reloading redis batch EVAL script", :exception => e); load_batch_script(redis) retry else raise e end end end
listener_loop(listener, output_queue)
click to toggle source
Since both listeners have the same basic loop, we've abstracted the outer loop.
# File lib/logstash/inputs/redis.rb, line 222 def listener_loop(listener, output_queue) while !finished? begin @redis ||= connect self.send listener, @redis, output_queue rescue Redis::CannotConnectError => e @logger.warn("Redis connection problem", :exception => e) sleep 1 @redis = connect rescue => e # redis error @logger.warn("Failed to get event from redis", :name => @name, :exception => e, :backtrace => e.backtrace) raise e end end # while !finished? end
load_batch_script(redis)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 106 def load_batch_script(redis) #A redis lua EVAL script to fetch a count of keys #in case count is bigger than current items in queue whole queue will be returned without extra nil values redis_script = <<EOF local i = tonumber(ARGV[1]) local res = {} local length = redis.call('llen',KEYS[1]) if length < i then i = length end while (i > 0) do local item = redis.call("lpop", KEYS[1]) if (not item) then break end table.insert(res, item) i = i-1 end return res EOF @redis_script_sha = redis.script(:load, redis_script) end
pattern_channel_listener(redis, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 203 def pattern_channel_listener(redis, output_queue) redis.psubscribe @key do |on| on.psubscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.pmessage do |ch, event, message| queue_event message, output_queue end on.punsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end
queue_event(msg, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 128 def queue_event(msg, output_queue) begin @codec.decode(msg) do |event| decorate(event) output_queue << event end rescue => e # parse or event creation error @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end