class NewRelic::Agent::PipeChannelManager::Pipe
Expected initial sequence of events for Pipe
usage:
-
Pipe
is created in parent process (read and write ends open) -
Parent process forks
-
An after_fork hook is invoked in the child
-
From after_fork hook, child closes read end of pipe, and writes a ready marker on the pipe (
after_fork_in_child
). -
The parent receives the ready marker, and closes the write end of the pipe in response (
after_fork_in_parent
).
After this sequence of steps, an exit (whether clean or not) of the child will result in the pipe being marked readable again, and giving an EOF marker (nil) when read. Note that closing of the unused ends of the pipe in the parent and child processes is essential in order for the EOF to be correctly triggered. The ready marker mechanism is used because there’s no easy hook for after_fork in the parent process.
This class provides message framing (separation of individual messages), but not serialization. Serialization / deserialization is the responsibility of clients.
Message framing works like this:
Each message sent across the pipe is preceded by a length tag that specifies the length of the message that immediately follows, in bytes. The length tags are serialized as unsigned big-endian long values, (4 bytes each). This means that the maximum theoretical message size is 4 GB - much larger than we’d ever need or want for this application.
Constants
- NUM_LENGTH_BYTES
- READY_MARKER
Attributes
Public Class Methods
# File lib/new_relic/agent/pipe_channel_manager.rb, line 64 def initialize @out, @in = IO.pipe if defined?(::Encoding::ASCII_8BIT) @in.set_encoding(::Encoding::ASCII_8BIT) end @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) @parent_pid = $$ end
Public Instance Methods
# File lib/new_relic/agent/pipe_channel_manager.rb, line 115 def after_fork_in_child @out.close unless @out.closed? write(READY_MARKER) end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 120 def after_fork_in_parent @in.close unless @in.closed? end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 73 def close @out.close unless @out.closed? @in.close unless @in.closed? end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 124 def closed? @out.closed? && @in.closed? end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 82 def deserialize_message_length(data) data.unpack('L>').first end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 111 def eof? !@out.closed? && @out.eof? end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 92 def read @in.close unless @in.closed? @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) length_bytes = @out.read(NUM_LENGTH_BYTES) if length_bytes message_length = deserialize_message_length(length_bytes) if message_length @out.read(message_length) else length_hex = length_bytes.bytes.map { |b| b.to_s(16) }.join(' ') NewRelic::Agent.logger.error("Failed to deserialize message length from pipe. Bytes: [#{length_hex}]") nil end else NewRelic::Agent.logger.error('Failed to read bytes for length from pipe.') nil end end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 78 def serialize_message_length(data) [data.bytesize].pack('L>') end
# File lib/new_relic/agent/pipe_channel_manager.rb, line 86 def write(data) @out.close unless @out.closed? @in << serialize_message_length(data) @in << data end