class MagicPipe::Senders::Async::Worker
Public Instance Methods
perform(decomposed_object, topic, time, client_name)
click to toggle source
# File lib/magic_pipe/senders/async.rb, line 12 def perform(decomposed_object, topic, time, client_name) client = MagicPipe.lookup_client(client_name) object = client.loader.load(decomposed_object) codec = client.codec metadata = { topic: topic, producer: client.config.producer_name, time: time.to_i, mime: codec::TYPE } envelope = Envelope.new( body: object, **metadata ) payload = codec.new(envelope).encode client.transport.submit!(payload, metadata) track_success(client.metrics, topic) rescue => e track_failure(client.metrics, topic) raise e end