class LogStash::Outputs::Redis
send events to a redis database using RPUSH
For more information about redis, see <redis.io/>
Public Instance Methods
congestion_check(key)
click to toggle source
# File lib/logstash/outputs/redis.rb, line 177 def congestion_check(key) return if @congestion_threshold == 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval end @congestion_check_time = Time.now.to_i end end
flush(events, key, teardown=false)
click to toggle source
called from Stud::Buffer#buffer_flush when there are events to flush
# File lib/logstash/outputs/redis.rb, line 189 def flush(events, key, teardown=false) @redis ||= connect # we should not block due to congestion on teardown # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless teardown @redis.rpush(key, events) end
on_flush_error(e)
click to toggle source
called from Stud::Buffer#buffer_flush when an error occurs
# File lib/logstash/outputs/redis.rb, line 197 def on_flush_error(e) @logger.warn("Failed to send backlog of events to redis", :identity => identity, :exception => e, :backtrace => e.backtrace ) @redis = connect end
receive(event)
click to toggle source
# File lib/logstash/outputs/redis.rb, line 136 def receive(event) return unless output?(event) if @batch and @data_type == 'list' # Don't use batched method for pubsub. # Stud::Buffer buffer_receive(event.to_json, event.sprintf(@key)) return end key = event.sprintf(@key) # TODO(sissel): We really should not drop an event, but historically # we have dropped events that fail to be converted to json. # TODO(sissel): Find a way to continue passing events through even # if they fail to convert properly. begin payload = event.to_json rescue Encoding::UndefinedConversionError, ArgumentError puts "FAILUREENCODING" @logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?", :event => event.inspect) return end begin @redis ||= connect if @data_type == 'list' congestion_check(key) @redis.rpush(key, payload) else @redis.publish(key, payload) end rescue => e @logger.warn("Failed to send event to redis", :event => event, :identity => identity, :exception => e, :backtrace => e.backtrace) sleep @reconnect_interval @redis = nil retry end end
register()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 92 def register require 'redis' # TODO remove after setting key and data_type to true if @queue if @key or @data_type raise RuntimeError.new( "Cannot specify queue parameter and key or data_type" ) end @key = @queue @data_type = 'list' end if not @key or not @data_type raise RuntimeError.new( "Must define queue, or key and data_type parameters" ) end # end TODO if @batch if @data_type != "list" raise RuntimeError.new( "batch is not supported with data_type #{@data_type}" ) end buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, :logger => @logger ) end @redis = nil if @shuffle_hosts @host.shuffle! end @host_idx = 0 @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } end
teardown()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 206 def teardown if @batch buffer_flush(:final => true) end if @data_type == 'channel' and @redis @redis.quit @redis = nil end end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 217 def connect @current_host, @current_port = @host[@host_idx].split(':') @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1 if not @current_port @current_port = @port end params = { :host => @current_host, :port => @current_port, :timeout => @timeout, :db => @db } @logger.debug(params) if @password params[:password] = @password.value end Redis.new(params) end
identity()
click to toggle source
A string used to identify a redis instance in log messages
# File lib/logstash/outputs/redis.rb, line 241 def identity @name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}" end