class Fluent::MessagePackEventStream
Public Class Methods
new(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
click to toggle source
Keep cached_unpacker argument for existing plugins
# File lib/fluent/event.rb, line 206 def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) @data = data @size = size @unpacked_times = unpacked_times @unpacked_records = unpacked_records end
Public Instance Methods
dup()
click to toggle source
# File lib/fluent/event.rb, line 217 def dup if @unpacked_times self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup)) else self.class.new(@data.dup, nil, @size) end end
each(unpacker: nil, &block)
click to toggle source
# File lib/fluent/event.rb, line 256 def each(unpacker: nil, &block) ensure_unpacked!(unpacker: unpacker) @unpacked_times.each_with_index do |time, i| block.call(time, @unpacked_records[i]) end nil end
empty?()
click to toggle source
# File lib/fluent/event.rb, line 213 def empty? @data.empty? end
ensure_unpacked!(unpacker: nil)
click to toggle source
# File lib/fluent/event.rb, line 236 def ensure_unpacked!(unpacker: nil) return if @unpacked_times && @unpacked_records @unpacked_times = [] @unpacked_records = [] (unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record end # @size should be updated always right after unpack. # The real size of unpacked objects are correct, rather than given size. @size = @unpacked_times.size end
repeatable?()
click to toggle source
# File lib/fluent/event.rb, line 232 def repeatable? true end
size()
click to toggle source
# File lib/fluent/event.rb, line 225 def size # @size is unbelievable always when @size == 0 # If the number of events is really zero, unpacking events takes very short time. ensure_unpacked! if @size == 0 @size end
slice(index, num)
click to toggle source
This method returns MultiEventStream
, because there are no reason to surve binary serialized by msgpack.
# File lib/fluent/event.rb, line 251 def slice(index, num) ensure_unpacked! MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num)) end
to_msgpack_stream(time_int: false, packer: nil)
click to toggle source
# File lib/fluent/event.rb, line 264 def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because @data is always packed binary in this class @data end