class Fluent::SomeOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_snowplow.rb, line 11
def configure(conf)
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 33
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_snowplow.rb, line 15
def start
  super

  @emitter = SnowplowTracker::Emitter.new(@host, {
    buffer_size: @buffer_size,
    protocol: @protocol,
    method: @method,
    on_success: ->(_) { log.debug("Flush with success on snowplow") },
    on_failure: ->(_, _) { raise "Error when flushing to snowplow" }
  })

  @trackers = {}
end
stop() click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 29
def stop
  @tracker.flush
end
tracker_for(application) click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 37
def tracker_for(application)
  @trackers[application] ||= SnowplowTracker::Tracker.new(@emitter, nil, nil, application)
  @trackers[application]
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_snowplow.rb, line 42
def write(chunk)
  application, tracker = nil, nil

  chunk.msgpack_each do |_, _, record|
    schema = record['schema']
    message = JSON.parse record['message']
    true_timestamp = record['true_timestamp']
    application = record['application']
    contexts = JSON.parse record.fetch('contexts', "[]")
    tracker = tracker_for(application)

    contexts = contexts.map do |context|
      context_schema = context['schema']
      context_message = context['message']

      SnowplowTracker::SelfDescribingJson.new(context_schema, context_message)
    end

    self_describing_json = SnowplowTracker::SelfDescribingJson.new(schema, message)
    tracker.track_self_describing_event(self_describing_json, contexts, SnowplowTracker::TrueTimestamp.new(true_timestamp.to_i))
  end

  tracker.flush
end