class LogStash::Outputs::Pipe

Pipe output.

Pipe events to stdin of another program. You can use fields from the event as parts of the command. WARNING: This feature can cause logstash to fork off multiple children if you are not carefull with per-event commandline.

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/pipe.rb, line 35
def receive(event)
  return unless output?(event)

  command = event.sprintf(@command)
  pipe = get_pipe(command)

  if @message_format
    output = event.sprintf(@message_format) + "\n"
  else
    output = event.to_json
  end

  begin
    pipe.puts(output)
  rescue IOError, Errno::EPIPE => e
    @logger.error("Error writing to pipe, closing pipe.", :command => command, :pipe => pipe)
    drop_pipe(command)
  end

  close_stale_pipes
end
register() click to toggle source
# File lib/logstash/outputs/pipe.rb, line 29
def register
  @pipes = {}
  @last_stale_cleanup_cycle = Time.now
end
teardown() click to toggle source
# File lib/logstash/outputs/pipe.rb, line 57
def teardown
  @logger.info("Teardown: closing pipes")
  @pipes.each do |command, pipe|
    begin
      drop_pipe(command)
      @logger.debug("Closed pipe #{command}", :pipe => pipe)
    rescue Exception => e
      @logger.error("Excpetion while closing pipes.", :exception => e)
    end
  end
  finished
end

Private Instance Methods

close_stale_pipes() click to toggle source

every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)

# File lib/logstash/outputs/pipe.rb, line 72
def close_stale_pipes
  return if @ttl <= 0
  now = Time.now
  return unless now - @last_stale_cleanup_cycle >= @ttl
  @logger.info("Starting stale pipes cleanup cycle", :pipes => @pipes)
  inactive_pipes = @pipes.select { |command, pipe| not pipe.active }
  @logger.debug("%d stale pipes found" % inactive_pipes.count, :inactive_pipes => inactive_pipes)
  inactive_pipes.each do |command, pipe|
    drop_pipe(command)
  end
  # mark all pipes as inactive, a call to write will mark them as active again
  @pipes.each { |command, pipe| pipe.active = false }
  @last_stale_cleanup_cycle = now
end
drop_pipe(command) click to toggle source
# File lib/logstash/outputs/pipe.rb, line 87
def drop_pipe(command)
    return unless @pipes.include? command
    @logger.info("Closing pipe \"%s\"" % command)
    begin
      @pipes[command].close
    rescue Exception => e
      @logger.warn("Failed to close pipe.", :error => e, :command => command)
    end
    @pipes.delete(command)
end
get_pipe(command) click to toggle source
# File lib/logstash/outputs/pipe.rb, line 98
def get_pipe(command)
  return @pipes[command] if @pipes.include?(command)

  @logger.info("Opening pipe", :command => command)

  @pipes[command] = PipeWrapper.new(command, mode="a+")
end