class Fluent::MessagePackEventStream

Public Class Methods

new(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) click to toggle source

Keep cached_unpacker argument for existing plugins

# File lib/fluent/event.rb, line 206
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
  @data = data
  @size = size
  @unpacked_times = unpacked_times
  @unpacked_records = unpacked_records
end

Public Instance Methods

dup() click to toggle source
# File lib/fluent/event.rb, line 217
def dup
  if @unpacked_times
    self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
  else
    self.class.new(@data.dup, nil, @size)
  end
end
each(unpacker: nil, &block) click to toggle source
# File lib/fluent/event.rb, line 256
def each(unpacker: nil, &block)
  ensure_unpacked!(unpacker: unpacker)
  @unpacked_times.each_with_index do |time, i|
    block.call(time, @unpacked_records[i])
  end
  nil
end
empty?() click to toggle source
# File lib/fluent/event.rb, line 213
def empty?
  @data.empty?
end
ensure_unpacked!(unpacker: nil) click to toggle source
# File lib/fluent/event.rb, line 236
def ensure_unpacked!(unpacker: nil)
  return if @unpacked_times && @unpacked_records
  @unpacked_times = []
  @unpacked_records = []
  (unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
    @unpacked_times << time
    @unpacked_records << record
  end
  # @size should be updated always right after unpack.
  # The real size of unpacked objects are correct, rather than given size.
  @size = @unpacked_times.size
end
repeatable?() click to toggle source
# File lib/fluent/event.rb, line 232
def repeatable?
  true
end
size() click to toggle source
# File lib/fluent/event.rb, line 225
def size
  # @size is unbelievable always when @size == 0
  # If the number of events is really zero, unpacking events takes very short time.
  ensure_unpacked! if @size == 0
  @size
end
slice(index, num) click to toggle source

This method returns MultiEventStream, because there are no reason to surve binary serialized by msgpack.

# File lib/fluent/event.rb, line 251
def slice(index, num)
  ensure_unpacked!
  MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end
to_msgpack_stream(time_int: false, packer: nil) click to toggle source
# File lib/fluent/event.rb, line 264
def to_msgpack_stream(time_int: false, packer: nil)
  # time_int is always ignored because @data is always packed binary in this class
  @data
end