class InfluxDB::LineProtocol::Parser
Line Protocol parser.
Constants
- BACKSLASH
All the special bytes Line Protocol handles. In UTF-8, these are all single byte characters. Any multi-byte characters are just skipped as part of the tokens (measurement, tag key, tag value, …).
- COMMA
- DECIMAL_POINT
- DIGIT_NINE
- DIGIT_ZERO
- EQUALS
- HASH
- LATIN_CAPITAL_LETTER_F
Start markers of a Boolean field value (not special anywhere else)
- LATIN_CAPITAL_LETTER_T
- LATIN_SMALL_LETTER_F
- LATIN_SMALL_LETTER_T
- MINUS_SIGN
- NEWLINE
- NULL
- PLUS_SIGN
Start markers of a numeric field value (not special anywhere else)
- QUOTATION_MARK
Start (and end) marker of a string field value (not special anywhere else)
- SPACE
- TAB
- UTF_8
- UTF_8_PACK_FORMAT
Public Class Methods
# File lib/influxdb/lineprotocol/parser.rb, line 30 def initialize(logger: nil, escapes: nil) if logger @log = logger else @log = ::Logger.new(STDERR) @log.level = :warn end case escapes when :compat @unescapes = InfluxDB::LineProtocol::CompatUnescapes.new else @unescapes = InfluxDB::LineProtocol::Unescapes.new end enter_whitespace0 end
Public Instance Methods
Parse the points from data.
If block is given, yields each point in data.
If block is not given, returns a list of points in data.
The data can be a String, or a single Integer or an Array of Integers. The Integers are assumed to be UTF-8 bytes.
# File lib/influxdb/lineprotocol/parser.rb, line 56 def each_point(data) buf = bytes(data) i = 0 len = buf.size points = block_given? ? nil : [] while i < len i = self.send(@state, buf, i, len) if @state == :complete if block_given? yield @point else points << @point end enter_whitespace0 end end points end
Private Instance Methods
# File lib/influxdb/lineprotocol/parser.rb, line 651 def bytes(data) case data when nil [].freeze when Integer [data].freeze when String data.encode(UTF_8).bytes.freeze when Array data end end
# File lib/influxdb/lineprotocol/parser.rb, line 553 def comment(buf, i, len) i = line_end(buf, i, len) if i < len enter_whitespace0 i += 1 end i end
# File lib/influxdb/lineprotocol/parser.rb, line 601 def decode(buf, start, i) str = if @buf.nil? (start <= i) ? string(buf[start..i]) : "" else (start <= i) ? string(@buf + buf[start..i]) : string(@buf) end @buf = nil case @state when :measurement @unescapes.unescape(:measurement, str) when :tag_key @unescapes.unescape(:tag_key, str) when :tag_value @unescapes.unescape(:tag_value, str) when :field_key @unescapes.unescape(:field_key, str) when :field_value_boolean case str when 't', 'T', 'true', 'True' true when 'f', 'F', 'false', 'False' false else @log.error("invalid Boolean: #{str}") nil end when :field_value_numeric case str when /^[-+]?([0-9]*\.)?[0-9]+([eE][-+]?[0-9]+)?$/ str.to_f when /^[+-]?[0-9]+[ui]$/ str.to_i else @log.error("invalid number: #{str}") nil end when :field_value_string @unescapes.unescape(:string, str) when :timestamp case str when /^-?[0-9]+$/ str.to_i else @log.error("invalid timestamp: #{str}") end else raise "error: decode: invalid state" end end
# File lib/influxdb/lineprotocol/parser.rb, line 80 def enter_whitespace0 @point = nil @state = :whitespace0 @escaped = false @buf = nil @key = nil end
# File lib/influxdb/lineprotocol/parser.rb, line 272 def field_key(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when EQUALS @key = decode(buf, start, i-1) if @key == "" @log.error("field key: empty key") @state = :invalid return i end @state = :field_value return i+1 when NEWLINE @log.error("field key: newline") @state = :invalid return i else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 309 def field_value(buf, i, len) if i == len return len end c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when LATIN_CAPITAL_LETTER_F, LATIN_CAPITAL_LETTER_T, LATIN_SMALL_LETTER_F, LATIN_SMALL_LETTER_T @state = :field_value_boolean i when DIGIT_ZERO..DIGIT_NINE, PLUS_SIGN, MINUS_SIGN, DECIMAL_POINT @state = :field_value_numeric i when QUOTATION_MARK @state = :field_value_string i + 1 else @log.error("field value: invalid") @state = :invalid i end end
# File lib/influxdb/lineprotocol/parser.rb, line 332 def field_value_boolean(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when COMMA value = decode(buf, start, i-1) if value.nil? @log.error("field value boolean: invalid boolean") @state = :invalid return i end @point[:values][@key] = value @key = nil @state = :field_key return i+1 when NEWLINE value = decode(buf, start, i-1) if value.nil? @log.error("field value boolean: invalid boolean") enter_whitespace0 return i + 1 end @point[:values][@key] = value @key = nil @state = :complete return i+1 when SPACE value = decode(buf, start, i-1) if value.nil? @log.error("field value boolean: invalid boolean") @state = :invalid return i end @point[:values][@key] = value @key = nil @state = :whitespace2 return i else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 389 def field_value_numeric(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when COMMA value = decode(buf, start, i-1) if value.nil? @log.error("field value numeric: invalid number") @state = :invalid return i end @point[:values][@key] = value @key = nil @state = :field_key return i+1 when NEWLINE value = decode(buf, start, i-1) if value.nil? @log.error("field value numeric: invalid number") @state = :invalid return i end @point[:values][@key] = value @key = nil @state = :complete return i+1 when SPACE value = decode(buf, start, i-1) if value.nil? @log.error("field value numeric: invalid number") @state = :invalid return invalid(buf, i, len) end @point[:values][@key] = value @key = nil @state = :whitespace2 return i else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 446 def field_value_string(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when QUOTATION_MARK value = decode(buf, start, i-1) if value.nil? @log.error("field value string: invalid string") @state = :invalid return i end @point[:values][@key] = value @key = nil @state = :field_value_string_end return i+1 else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 481 def field_value_string_end(buf, i, len) if i < len c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when COMMA @state = :field_key i + 1 when NEWLINE @state = :complete i + 1 when SPACE @state = :whitespace2 i else @state = :invalid i end else len end end
# File lib/influxdb/lineprotocol/parser.rb, line 562 def invalid(buf, i, len) i = line_end(buf, i, len) if i < len enter_whitespace0 i += 1 end i end
Starting from position i, returns the index of the newline. Returns len if no such byte is found.
# File lib/influxdb/lineprotocol/parser.rb, line 574 def line_end(buf, i, len) while i < len c = buf[i] raise "unsupported input type" unless c.is_a? Integer if c == NEWLINE return i end i += 1 end len end
# File lib/influxdb/lineprotocol/parser.rb, line 145 def measurement(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when COMMA # start of tag set. @point = {series: decode(buf, start, i-1), tags: {}, values: {}} @state = :tag_key return i+1 when NEWLINE @log.error("measurement: missing fields") @state = :invalid return i when SPACE # start of field set @point = {series: decode(buf, start, i-1), values: {}} @state = :whitespace1 return i + 1 else # part of measurement i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 664 def string(buf) buf.pack(UTF_8_PACK_FORMAT).force_encoding(UTF_8) end
# File lib/influxdb/lineprotocol/parser.rb, line 181 def tag_key(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when EQUALS @key = decode(buf, start, i-1) if @key == "" @log.error("tag_key: empty key") @state = :invalid return i end @state = :tag_value return i+1 when NEWLINE @log.error("tag key: newline") @state = :invalid return i else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 218 def tag_value(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when COMMA @point[:tags][@key] = decode(buf, start, i-1) @key = nil @state = :tag_key return i+1 when NEWLINE @log.error("tag value: newline") @state = :invalid return i when SPACE @point[:tags][@key] = decode(buf, start, i-1) @key = nil @state = :field_key i, _ = whitespace(buf, i + 1, len) return i else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
# File lib/influxdb/lineprotocol/parser.rb, line 519 def timestamp(buf, i, len) start = i while i < len if @escaped @escaped = false i += 1 else c = buf[i] raise "unsupported input type" unless c.is_a? Integer case c when BACKSLASH @escaped = true i += 1 when NEWLINE value = decode(buf, start, i-1) if value.nil? @log.error("timestamp: invalid timestamp") @state = :invalid return i end @point[:timestamp] = value @state = :complete return i + 1 else i += 1 end end end if i == len && start < i @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1] end i end
Starting from position i, return index of the first non-whitespace byte and the byte itself. Return len and nil if no such byte is found.
# File lib/influxdb/lineprotocol/parser.rb, line 589 def whitespace(buf, i, len) while i < len c = buf[i] raise "unsupported input type" unless c.is_a? Integer if c != SPACE && c != TAB && c != NULL return [i, c] end i += 1 end [len, nil] end
# File lib/influxdb/lineprotocol/parser.rb, line 121 def whitespace0(buf, i, len) # whitespace consumes TAB, SPACE, and NULL. # This method consumes NEWLINE, HASH, and COMMA. # BACKSLASH and EQUAL (of the special bytes) are valid measurement starts; they are not consumed. i, c = whitespace(buf, i, len) case c when nil # just whitespace len when COMMA @log.error "whitespace0: missing measurement" @state = :invalid i + 1 when HASH # comment @state = :comment i + 1 when NEWLINE i + 1 else # don't advance i because the byte belongs to measurement @state = :measurement i end end
# File lib/influxdb/lineprotocol/parser.rb, line 257 def whitespace1(buf, i, len) i, c = whitespace(buf, i, len) case c when nil len when NEWLINE @log.error("whitespace1: missing fields") @state = :invalid i else @state = :field_key i end end
# File lib/influxdb/lineprotocol/parser.rb, line 504 def whitespace2(buf, i, len) i, c = whitespace(buf, i, len) case c when nil len when NEWLINE @log.error("whitespace2: missing timestamp") @state = :invalid i else @state = :timestamp i end end