class WaterDrop::Clients::Buffered
Client used to buffer messages that we send out in specs and other places.
Attributes
Public Class Methods
Source
# File lib/waterdrop/clients/buffered.rb, line 10 def initialize(*args) super @messages = [] @topics = Hash.new { |k, v| k[v] = [] } @transaction_active = false @transaction_messages = [] @transaction_topics = Hash.new { |k, v| k[v] = [] } @transaction_level = 0 end
@param args [Object] anything accepted by ‘Clients::Dummy`
WaterDrop::Clients::Dummy::new
Public Instance Methods
Source
# File lib/waterdrop/clients/buffered.rb, line 70 def abort_transaction @transaction_level -= 1 @transaction_topics.clear @transaction_messages.clear @transaction_active = false end
Aborts the transaction
Source
# File lib/waterdrop/clients/buffered.rb, line 38 def begin_transaction @transaction_level += 1 @transaction_active = true end
Starts the transaction on a given level
Source
# File lib/waterdrop/clients/buffered.rb, line 44 def commit_transaction @transaction_level -= 1 # Transfer transactional data on success @transaction_topics.each do |topic, messages| @topics[topic] += messages end @messages += @transaction_messages @transaction_topics.clear @transaction_messages.clear @transaction_active = false end
Finishes given level of transaction
Source
# File lib/waterdrop/clients/buffered.rb, line 79 def messages_for(topic) @topics[topic] end
Returns messages produced to a given topic @param topic [String]
Source
# File lib/waterdrop/clients/buffered.rb, line 24 def produce(message) if @transaction_active @transaction_topics[message.fetch(:topic)] << message @transaction_messages << message else # We pre-validate the message payload, so topic is ensured to be present @topics[message.fetch(:topic)] << message @messages << message end super(**message.to_h) end
“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer @param message [Hash] ‘WaterDrop::Producer#produce_sync` message hash @return [Dummy::Handle] fake delivery handle that can be materialized into a report
WaterDrop::Clients::Dummy#produce
Source
# File lib/waterdrop/clients/buffered.rb, line 85 def reset @transaction_level = 0 @transaction_active = false @transaction_topics.clear @transaction_messages.clear @messages.clear @topics.each_value(&:clear) end
Clears internal buffer Used in between specs so messages do not leak out
Source
# File lib/waterdrop/clients/buffered.rb, line 65 def send_offsets_to_transaction(_consumer, _tpl, _timeout) nil end
Fakes storing the offset in a transactional fashion
@param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain
the librdkafka consumer group metadata pointer
@param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage @param _timeout [Integer] ms timeout