class WaterDrop::Instrumentation::LoggerListener
Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Waterdrop flow @note It is a module as we can use it then as a part of the Karafka framework listener
as well as we can use it standalone
Public Class Methods
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 20 def initialize(logger, log_messages: true) @logger = logger @log_messages = log_messages end
@param logger [Object] logger we want to use @param log_messages [Boolean] Should we report the messages content (payload and metadata)
with each message operation. This can be extensive, especially when producing a lot of messages. We provide this despite the fact that we only report payloads in debug, because Rails by default operates with debug level. This means, that when working with Rails in development, every single payload dispatched will go to logs. In majority of the cases this is extensive and simply floods the end user.
Public Instance Methods
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 97 def on_buffer_flushed_async(event) messages = event[:messages] info(event, "Async flushing of #{messages.size} messages from the buffer") return unless log_messages? debug(event, messages) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 108 def on_buffer_flushed_sync(event) messages = event[:messages] info(event, "Sync flushing of #{messages.size} messages from the buffer") return unless log_messages? debug(event, messages) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 119 def on_buffer_purged(event) info(event, 'Successfully purging buffer') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 141 def on_error_occurred(event) error = event[:error] type = event[:type] error(event, "Error occurred: #{error} - #{type}") end
@param event [Dry::Events::Event] event that happened with the error details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 75 def on_message_buffered(event) message = event[:message] info(event, "Buffering of a message to '#{message[:topic]}' topic") return unless log_messages? debug(event, [message]) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 26 def on_message_produced_async(event) message = event[:message] info(event, "Message to '#{message[:topic]}' topic was delegated to a dispatch queue") return unless log_messages? debug(event, message) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 37 def on_message_produced_sync(event) message = event[:message] info(event, "Sync producing of a message to '#{message[:topic]}' topic") return unless log_messages? debug(event, message) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 86 def on_messages_buffered(event) messages = event[:messages] info(event, "Buffering of #{messages.size} messages") return unless log_messages? debug(event, [messages, messages.size]) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 48 def on_messages_produced_async(event) messages = event[:messages] topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count info( event, "#{messages.size} messages to #{topics_count} topics were delegated to a dispatch queue" ) return unless log_messages? debug(event, messages) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 63 def on_messages_produced_sync(event) messages = event[:messages] topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics") return unless log_messages? debug(event, messages) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 131 def on_producer_closed(event) info(event, 'Closing producer') end
@param event [Dry::Events::Event] event that happened with the details @note While this says “Closing producer”, it produces a nice message with time taken:
"Closing producer took 12 ms" indicating it happened in the past.
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 124 def on_producer_closing(event) info(event, 'Closing producer') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 136 def on_producer_reloaded(event) info(event, 'Producer successfully reloaded') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 154 def on_transaction_aborted(event) info(event, 'Aborting transaction') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 159 def on_transaction_committed(event) info(event, 'Committing transaction') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 178 def on_transaction_finished(event) info(event, 'Processing transaction') end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 164 def on_transaction_marked_as_consumed(event) message = event[:message] topic = message.topic partition = message.partition offset = message.offset loc = "#{topic}-#{partition}" info( event, "Marking message with offset #{offset} on #{loc} as consumed in a transaction" ) end
@param event [Dry::Events::Event] event that happened with the details
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 149 def on_transaction_started(event) info(event, 'Starting transaction') end
@param event [Dry::Events::Event] event that happened with the details
Private Instance Methods
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 191 def debug(event, log_message) @logger.debug("[#{event[:producer_id]}] #{log_message}") end
@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 207 def error(event, log_message) @logger.error("[#{event[:producer_id]}] #{log_message}") end
@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 197 def info(event, log_message) if event.payload.key?(:time) @logger.info("[#{event[:producer_id]}] #{log_message} took #{event[:time].round(2)} ms") else @logger.info("[#{event[:producer_id]}] #{log_message}") end end
@param event [Dry::Events::Event] event that happened with the details @param log_message [String] message we want to publish
Source
# File lib/waterdrop/instrumentation/logger_listener.rb, line 185 def log_messages? @log_messages end
@return [Boolean] should we report the messages details in the debug mode.