class LogStash::Inputs::Ganglia
Read ganglia packets from the network via udp
Public Class Methods
new(params)
click to toggle source
Calls superclass method
LogStash::Inputs::Base::new
# File lib/logstash/inputs/ganglia.rb, line 26 def initialize(params) super @shutdown_requested = false BasicSocket.do_not_reverse_lookup = true end
Public Instance Methods
parse_packet(packet)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 97 def parse_packet(packet) gmonpacket=GmonPacket.new(packet) if gmonpacket.meta? # Extract the metadata from the packet meta=gmonpacket.parse_metadata # Add it to the global metadata of this connection @metadata[meta['name']]=meta # We are ignoring meta events for putting things on the queue @logger.debug("received a meta packet", @metadata) return nil elsif gmonpacket.data? data=gmonpacket.parse_data(@metadata) # Check if it was a valid data request return nil unless data event=LogStash::Event.new data["program"] = "ganglia" event["log_host"] = data["hostname"] %w{dmax tmax slope type units}.each do |info| event[info] = @metadata[data["name"]][info] end return event else # Skipping unknown packet types return nil end end
register()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 33 def register end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 37 def run(output_queue) begin udp_listener(output_queue) rescue => e if !@shutdown_requested @logger.warn("ganglia udp listener died", :address => "#{@host}:#{@port}", :exception => e, :backtrace => e.backtrace) sleep(5) retry end end # begin end
teardown()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 81 def teardown @shutdown_requested = true close_udp finished end
Private Instance Methods
close_udp()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 88 def close_udp if @udp @udp.close_read rescue nil @udp.close_write rescue nil end @udp = nil end
udp_listener(output_queue)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 52 def udp_listener(output_queue) @logger.info("Starting ganglia udp listener", :address => "#{@host}:#{@port}") if @udp @udp.close_read @udp.close_write end @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) @metadata = Hash.new if @metadata.nil? loop do packet, client = @udp.recvfrom(9000) # TODO(sissel): make this a codec... e = parse_packet(packet) unless e.nil? decorate(e) e["host"] = client[3] # the IP address output_queue << e end end ensure close_udp end