class LogStash::Outputs::Pipe
Pipe
output.
Pipe
events to stdin of another program. You can use fields from the event as parts of the command. WARNING: This feature can cause logstash to fork off multiple children if you are not carefull with per-event commandline.
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 35 def receive(event) return unless output?(event) command = event.sprintf(@command) pipe = get_pipe(command) if @message_format output = event.sprintf(@message_format) + "\n" else output = event.to_json end begin pipe.puts(output) rescue IOError, Errno::EPIPE => e @logger.error("Error writing to pipe, closing pipe.", :command => command, :pipe => pipe) drop_pipe(command) end close_stale_pipes end
register()
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 29 def register @pipes = {} @last_stale_cleanup_cycle = Time.now end
teardown()
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 57 def teardown @logger.info("Teardown: closing pipes") @pipes.each do |command, pipe| begin drop_pipe(command) @logger.debug("Closed pipe #{command}", :pipe => pipe) rescue Exception => e @logger.error("Excpetion while closing pipes.", :exception => e) end end finished end
Private Instance Methods
close_stale_pipes()
click to toggle source
every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
# File lib/logstash/outputs/pipe.rb, line 72 def close_stale_pipes return if @ttl <= 0 now = Time.now return unless now - @last_stale_cleanup_cycle >= @ttl @logger.info("Starting stale pipes cleanup cycle", :pipes => @pipes) inactive_pipes = @pipes.select { |command, pipe| not pipe.active } @logger.debug("%d stale pipes found" % inactive_pipes.count, :inactive_pipes => inactive_pipes) inactive_pipes.each do |command, pipe| drop_pipe(command) end # mark all pipes as inactive, a call to write will mark them as active again @pipes.each { |command, pipe| pipe.active = false } @last_stale_cleanup_cycle = now end
drop_pipe(command)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 87 def drop_pipe(command) return unless @pipes.include? command @logger.info("Closing pipe \"%s\"" % command) begin @pipes[command].close rescue Exception => e @logger.warn("Failed to close pipe.", :error => e, :command => command) end @pipes.delete(command) end
get_pipe(command)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 98 def get_pipe(command) return @pipes[command] if @pipes.include?(command) @logger.info("Opening pipe", :command => command) @pipes[command] = PipeWrapper.new(command, mode="a+") end