class Fluent::SomeOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_snowplow.rb, line 11 def configure(conf) super end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 33 def format(tag, time, record) [tag, time, record].to_msgpack end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_snowplow.rb, line 15 def start super @emitter = SnowplowTracker::Emitter.new(@host, { buffer_size: @buffer_size, protocol: @protocol, method: @method, on_success: ->(_) { log.debug("Flush with success on snowplow") }, on_failure: ->(_, _) { raise "Error when flushing to snowplow" } }) @trackers = {} end
stop()
click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 29 def stop @tracker.flush end
tracker_for(application)
click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 37 def tracker_for(application) @trackers[application] ||= SnowplowTracker::Tracker.new(@emitter, nil, nil, application) @trackers[application] end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 42 def write(chunk) application, tracker = nil, nil chunk.msgpack_each do |_, _, record| schema = record['schema'] message = JSON.parse record['message'] true_timestamp = record['true_timestamp'] application = record['application'] contexts = JSON.parse record.fetch('contexts', "[]") tracker = tracker_for(application) contexts = contexts.map do |context| context_schema = context['schema'] context_message = context['message'] SnowplowTracker::SelfDescribingJson.new(context_schema, context_message) end self_describing_json = SnowplowTracker::SelfDescribingJson.new(schema, message) tracker.track_self_describing_event(self_describing_json, contexts, SnowplowTracker::TrueTimestamp.new(true_timestamp.to_i)) end tracker.flush end