class LogStash::Inputs::Ganglia

Read ganglia packets from the network via udp

Public Class Methods

new(params) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/ganglia.rb, line 26
def initialize(params)
  super
  @shutdown_requested = false
  BasicSocket.do_not_reverse_lookup = true
end

Public Instance Methods

parse_packet(packet) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 97
def parse_packet(packet)
  gmonpacket=GmonPacket.new(packet)
  if gmonpacket.meta?
    # Extract the metadata from the packet
    meta=gmonpacket.parse_metadata
    # Add it to the global metadata of this connection
    @metadata[meta['name']]=meta

    # We are ignoring meta events for putting things on the queue
    @logger.debug("received a meta packet", @metadata)
    return nil
  elsif gmonpacket.data?
    data=gmonpacket.parse_data(@metadata)

    # Check if it was a valid data request
    return nil unless data

    event=LogStash::Event.new

    data["program"] = "ganglia"
    event["log_host"] = data["hostname"]
    %w{dmax tmax slope type units}.each do |info|
      event[info] = @metadata[data["name"]][info]
    end
    return event
  else
    # Skipping unknown packet types
    return nil
  end
end
register() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 33
def register
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 37
def run(output_queue)
  begin
    udp_listener(output_queue)
  rescue => e
    if !@shutdown_requested
      @logger.warn("ganglia udp listener died",
                   :address => "#{@host}:#{@port}", :exception => e,
      :backtrace => e.backtrace)
      sleep(5)
      retry
    end
  end # begin
end
teardown() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 81
def teardown
  @shutdown_requested = true
  close_udp
  finished
end

Private Instance Methods

close_udp() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 88
def close_udp
  if @udp
    @udp.close_read rescue nil
    @udp.close_write rescue nil
  end
  @udp = nil
end
udp_listener(output_queue) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 52
def udp_listener(output_queue)
  @logger.info("Starting ganglia udp listener", :address => "#{@host}:#{@port}")

  if @udp
    @udp.close_read
    @udp.close_write
  end

  @udp = UDPSocket.new(Socket::AF_INET)
  @udp.bind(@host, @port)

  @metadata = Hash.new if @metadata.nil?
  loop do
    packet, client = @udp.recvfrom(9000)
    # TODO(sissel): make this a codec...
    e = parse_packet(packet)
    unless e.nil?
      decorate(e)
      e["host"] = client[3] # the IP address
      output_queue << e
    end
  end
ensure
  close_udp
end