class LogStash::Inputs::Collectd
Public Class Methods
new(params)
click to toggle source
Calls superclass method
LogStash::Inputs::Base::new
# File lib/logstash/inputs/collectd.rb, line 59 def initialize(params) super BasicSocket.do_not_reverse_lookup = true @idbyte = 0 @length = 0 @prev_typenum = 0 @header = []; @body = [] @timestamp = Time.now().utc @collectd = {} @types = {} end
Public Instance Methods
get_types(paths)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 113 def get_types(paths) # Get the typesdb paths.each do |path| @logger.info("Getting Collectd typesdb info", :typesdb => path.to_s) File.open(path, 'r').each_line do |line| typename, *line = line.strip.split next if typename.nil? || if typename[0,1] != '#' # Don't process commented or blank lines v = line.collect { |l| l.strip.split(":")[0] } @types[typename] = v end end end @logger.debug("Collectd Types", :types => @types.to_s) end
get_values(id, body)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 156 def get_values(id, body) retval = '' case id when 0,2,3,4,5,100 #=> String types retval = body.pack("C*") retval = retval[0..-2] when 1 # Time # Time here, in bit-shifted format. Parse bytes into UTC. byte1, byte2 = body.pack("C*").unpack("NN") retval = Time.at(( ((byte1 << 32) + byte2))).utc when 7,101 #=> Numeric types retval = body.slice!(0..7).pack("C*").unpack("E")[0] when 8 # Time, Hi-Res # Time here, in bit-shifted format. Parse bytes into UTC. byte1, byte2 = body.pack("C*").unpack("NN") retval = Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc when 9 # Interval, Hi-Res byte1, byte2 = body.pack("C*").unpack("NN") retval = (((byte1 << 32) + byte2) * (2**-30)).to_i when 6 # Values val_bytes = body.slice!(0..1) val_count = val_bytes.pack("C*").unpack("n") if body.length % 9 == 0 # Should be 9 fields count = 0 retval = [] types = body.slice!(0..((body.length/9)-1)) while body.length > 0 vtype = vt_map(types[count]) case types[count] when 0, 3; v = body.slice!(0..7).pack("C*").unpack("Q>")[0] when 1; v = body.slice!(0..7).pack("C*").unpack("E")[0] when 2; v = body.slice!(0..7).pack("C*").unpack("q>")[0] else; v = 0 end retval << v count += 1 end else @logger.error("Incorrect number of data fields for collectd record", :body => body.to_s) end end # Populate some state variables based on their type... case id when 2 if @plugin != retval # Zero-out @plugin_instance when @plugin changes @plugin_instance = '' @collectd.delete('plugin_instance') end @plugin = retval when 0; @cdhost = retval when 3; @plugin_instance = retval when 4; @cdtype = retval when 5; @type_instance = retval when 1,8; @timestamp = retval end return retval end
register()
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 72 def register @udp = nil if @typesdb.nil? if __FILE__ =~ /^file:\/.+!.+/ begin # Running from a jar, assume types.db is at the root. jar_path = [__FILE__.split("!").first, "/types.db"].join("!") @typesdb = [jar_path] rescue => ex raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}" end else if File.exists?("types.db") @typesdb = ["types.db"] elsif File.exists?("vendor/collectd/types.db") @typesdb = ["vendor/collectd/types.db"] else raise "You must specify 'typesdb => ...' in your collectd input" end end end @logger.info("Using internal types.db", :typesdb => @typesdb.to_s) end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 97 def run(output_queue) begin # get types get_types(@typesdb) # collectd server collectd_listener(output_queue) rescue LogStash::ShutdownSignal # do nothing, shutdown was requested. rescue => e @logger.warn("Collectd listener died", :exception => e, :backtrace => e.backtrace) sleep(5) retry end # begin end
teardown()
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 302 def teardown @udp.close if @udp && !@udp.closed? end
type_map(id)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 129 def type_map(id) case id when 0; return "host" when 1,8; return "@timestamp" when 2; return "plugin" when 3; return "plugin_instance" when 4; return "collectd_type" when 5; return "type_instance" when 6; return "values" when 9; return "interval" when 100; return "message" when 101; return "severity" end end
vt_map(id)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 145 def vt_map(id) case id when 0; return "COUNTER" when 1; return "GAUGE" when 2; return "DERIVE" when 3; return "ABSOLUTE" else; return 'UNKNOWN' end end
Private Instance Methods
collectd_listener(output_queue)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 228 def collectd_listener(output_queue) @logger.info("Starting Collectd listener", :address => "#{@host}:#{@port}") if @udp && ! @udp.closed? @udp.close end @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) loop do payload, client = @udp.recvfrom(@buffer_size) payload.each_byte do |byte| # According to the documentation for the binary protocol # it takes 4 bytes to define the header: # The first 2 bytes are the type number, # the second 2 bytes are the length of the message. # So, until we have looped 4 times (@idbyte is our counter) # append the byte to the @header if @idbyte < 4 @header << byte # Now that we have looped exactly 4 times... elsif @idbyte == 4 @typenum = (@header[0] << 1) + @header[1] # @typenum gets the first 2 bytes @length = (@header[2] << 1) + @header[3] # @length gets the second 2 bytes @body << byte # @body begins with the current byte # And if we've looped more than 4, up until the length of the message (now defined) elsif @idbyte > 4 && @idbyte < @length @body << byte # append the current byte to @body end # So long as we have @length and we've reached it, it's time to parse if @length > 0 && @idbyte == @length-1 field = type_map(@typenum) # Get the field name based on type if @typenum < @prev_typenum # We've started over, generate an event if @prune_intervals generate_event(@collectd, output_queue) unless @prev_typenum == 7 or @prev_typenum == 9 else generate_event(@collectd, output_queue) end @collectd.clear # Empty @collectd @collectd['host'] = @cdhost # Reset these from state @collectd['collectd_type'] = @cdtype @collectd['plugin'] = @plugin @collectd['plugin_instance'] = @plugin_instance @collectd['@timestamp'] = @timestamp end # Here is where we actually fill @collectd values = get_values(@typenum, @body) if values.kind_of?(Array) if values.length > 1 # Only do this iteration on multi-value arrays values.each_with_index {|value, x| @collectd[@types[@collectd['collectd_type']][x]] = values[x]} else # Otherwise it's a single value @collectd['value'] = values[0] # So name it 'value' accordingly end elsif field != nil # Not an array, make sure it's non-empty @collectd[field] = values # Append values to @collectd under key field end @prev_typenum = @typenum # All bytes in the collectd event have now been processed. Reset counters, header & body. @idbyte = 0; @length = 0; @header.clear; @body.clear; else # Increment the byte positional counter @idbyte += 1 end # End of if @length > 0 && @idbyte == @length-1 end # End of payload.each_byte do |byte| loop end # End of loop do, payload, client = @udp.recvfrom(@buffer_size) ensure if @udp @udp.close_read rescue nil @udp.close_write rescue nil end end
generate_event(data, output_queue)
click to toggle source
# File lib/logstash/inputs/collectd.rb, line 215 def generate_event(data, output_queue) # Prune these *specific* keys if they exist and are empty. # This is better than looping over all keys every time. data.delete('type_instance') if data['type_instance'] == "" data.delete('plugin_instance') if data['plugin_instance'] == "" # As crazy as it sounds, this is where we actually send our events to the queue! event = LogStash::Event.new data.each {|k, v| event[k] = data[k]} decorate(event) output_queue << event end