class NewRelic::Agent::PipeChannelManager::Listener
Attributes
This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener
thread in the parent process is holding the @pipes_lock at the time of the fork.
This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener
thread in the parent process is holding the @pipes_lock at the time of the fork.
This attr_accessor intentionally provides unsynchronized access to the @pipes hash. It is used to look up the write end of the pipe from within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener
thread in the parent process is holding the @pipes_lock at the time of the fork.
Public Class Methods
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 140 def initialize @started = nil @pipes = {} @pipes_lock = Mutex.new @timeout = 360 @select_timeout = 60 end
Public Instance Methods
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 218 def close_all_pipes @pipes_lock.synchronize do @pipes.each do |id, pipe| # Needs else branch coverage pipe.close if pipe # rubocop:disable Style/SafeNavigation end @pipes = {} end end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 153 def register_pipe(id) @pipes_lock.synchronize do @pipes[id] = Pipe.new end wakeup end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 161 def start return if @started == true @started = true @thread = NewRelic::Agent::Threading::AgentThread.create('Pipe Channel Manager') do now = nil loop do clean_up_pipes pipes_to_listen_to = @pipes_lock.synchronize do @pipes.values.map { |pipe| pipe.out } + [wake.out] end if now NewRelic::Agent.record_metric( 'Supportability/Listeners', Process.clock_gettime(Process::CLOCK_REALTIME) - now ) end if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout) now = Process.clock_gettime(Process::CLOCK_REALTIME) ready_pipes = ready[0] ready_pipes.each do |pipe| merge_data_from_pipe(pipe) unless pipe == wake.out end begin wake.out.read_nonblock(1) if ready_pipes.include?(wake.out) rescue IO::WaitReadable NewRelic::Agent.logger.error('Issue while reading from the ready pipe') NewRelic::Agent.logger.error("Ready pipes: #{ready_pipes.map(&:to_s)}, wake.out pipe: #{wake.out}") end end break unless should_keep_listening? end end sleep(0.001) # give time for the thread to spawn end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 232 def started? @started end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 209 def stop return unless @started == true stop_listener_thread close_all_pipes @wake.close @wake = nil end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 203 def stop_listener_thread @started = false wakeup @thread.join end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 228 def wake @wake ||= Pipe.new end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 149 def wakeup wake.in << '.' end
Protected Instance Methods
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 268 def clean_up_pipes @pipes_lock.synchronize do @pipes.values.each do |pipe| if pipe.last_read + @timeout < Process.clock_gettime(Process::CLOCK_REALTIME) pipe.close unless pipe.closed? end end @pipes.reject! { |id, pipe| pipe.out.closed? } end end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 279 def find_pipe_for_handle(out_handle) @pipes_lock.synchronize do @pipes.values.find { |pipe| pipe.out == out_handle } end end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 238 def merge_data_from_pipe(pipe_handle) pipe = find_pipe_for_handle(pipe_handle) raw_payload = pipe.read if raw_payload && !raw_payload.empty? if raw_payload == Pipe::READY_MARKER pipe.after_fork_in_parent else payload = unmarshal(raw_payload) if payload endpoint, items = payload NewRelic::Agent.agent.merge_data_for_endpoint(endpoint, items) end end end pipe.close if pipe.eof? end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 264 def should_keep_listening? @started || @pipes_lock.synchronize { @pipes.values.find { |pipe| !pipe.in.closed? } } end
Source
# File lib/new_relic/agent/pipe_channel_manager.rb, line 256 def unmarshal(data) Marshal.load(data) rescue StandardError => e ::NewRelic::Agent.logger.error('Failure unmarshalling message from Resque child process', e) ::NewRelic::Agent.logger.debug(NewRelic::Base64.encode64(data)) nil end