class InfluxDB::LineProtocol::Parser

Line Protocol parser.

Constants

BACKSLASH

All the special bytes Line Protocol handles. In UTF-8, these are all single byte characters. Any multi-byte characters are just skipped as part of the tokens (measurement, tag key, tag value, …).

COMMA
DECIMAL_POINT
DIGIT_NINE
DIGIT_ZERO
EQUALS
HASH
LATIN_CAPITAL_LETTER_F

Start markers of a Boolean field value (not special anywhere else)

LATIN_CAPITAL_LETTER_T
LATIN_SMALL_LETTER_F
LATIN_SMALL_LETTER_T
MINUS_SIGN
NEWLINE
NULL
PLUS_SIGN

Start markers of a numeric field value (not special anywhere else)

QUOTATION_MARK

Start (and end) marker of a string field value (not special anywhere else)

SPACE
TAB
UTF_8
UTF_8_PACK_FORMAT

Public Class Methods

new(logger: nil, escapes: nil) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 30
def initialize(logger: nil, escapes: nil)
  if logger
    @log = logger
  else
    @log = ::Logger.new(STDERR)
    @log.level = :warn
  end
  case escapes
  when :compat
    @unescapes = InfluxDB::LineProtocol::CompatUnescapes.new
  else
    @unescapes = InfluxDB::LineProtocol::Unescapes.new
  end
  enter_whitespace0
end

Public Instance Methods

each_point(data) { |point| ... } click to toggle source

Parse the points from data.

If block is given, yields each point in data.

If block is not given, returns a list of points in data.

The data can be a String, or a single Integer or an Array of Integers. The Integers are assumed to be UTF-8 bytes.

# File lib/influxdb/lineprotocol/parser.rb, line 56
def each_point(data)
  buf = bytes(data)
  i = 0
  len = buf.size

  points = block_given? ? nil : []

  while i < len
    i = self.send(@state, buf, i, len)
    if @state == :complete
      if block_given?
        yield @point
      else
        points << @point
      end
      enter_whitespace0
    end
  end

  points
end

Private Instance Methods

bytes(data) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 651
def bytes(data)
  case data
  when nil
    [].freeze
  when Integer
    [data].freeze
  when String
    data.encode(UTF_8).bytes.freeze
  when Array
    data
  end
end
comment(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 553
def comment(buf, i, len)
  i = line_end(buf, i, len)
  if i < len
    enter_whitespace0
    i += 1
  end
  i
end
decode(buf, start, i) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 601
def decode(buf, start, i)
  str = if @buf.nil?
          (start <= i) ? string(buf[start..i]) : ""
        else
          (start <= i) ? string(@buf + buf[start..i]) : string(@buf)
        end
  @buf = nil
  case @state
  when :measurement
    @unescapes.unescape(:measurement, str)
  when :tag_key
    @unescapes.unescape(:tag_key, str)
  when :tag_value
    @unescapes.unescape(:tag_value, str)
  when :field_key
    @unescapes.unescape(:field_key, str)
  when :field_value_boolean
    case str
    when 't', 'T', 'true', 'True'
      true
    when 'f', 'F', 'false', 'False'
      false
    else
      @log.error("invalid Boolean: #{str}")
      nil
    end
  when :field_value_numeric
    case str
    when /^[-+]?([0-9]*\.)?[0-9]+([eE][-+]?[0-9]+)?$/
      str.to_f
    when /^[+-]?[0-9]+[ui]$/
      str.to_i
    else
      @log.error("invalid number: #{str}")
      nil
    end
  when :field_value_string
    @unescapes.unescape(:string, str)
  when :timestamp
    case str
    when /^-?[0-9]+$/
      str.to_i
    else
      @log.error("invalid timestamp: #{str}")
    end
  else
    raise "error: decode: invalid state"
  end
end
enter_whitespace0() click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 80
def enter_whitespace0
  @point = nil
  @state = :whitespace0
  @escaped = false
  @buf = nil
  @key = nil
end
field_key(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 272
def field_key(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when EQUALS
        @key = decode(buf, start, i-1)
        if @key == ""
          @log.error("field key: empty key")
          @state = :invalid
          return i
        end
        @state = :field_value
        return i+1
      when NEWLINE
        @log.error("field key: newline")
        @state = :invalid
        return i
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
field_value(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 309
def field_value(buf, i, len)
  if i == len
    return len
  end
  c = buf[i]
  raise "unsupported input type" unless c.is_a? Integer
  case c
  when LATIN_CAPITAL_LETTER_F, LATIN_CAPITAL_LETTER_T, LATIN_SMALL_LETTER_F, LATIN_SMALL_LETTER_T
    @state = :field_value_boolean
    i
  when DIGIT_ZERO..DIGIT_NINE, PLUS_SIGN, MINUS_SIGN, DECIMAL_POINT
    @state = :field_value_numeric
    i
  when QUOTATION_MARK
    @state = :field_value_string
    i + 1
  else
    @log.error("field value: invalid")
    @state = :invalid
    i
  end
end
field_value_boolean(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 332
def field_value_boolean(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when COMMA
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value boolean: invalid boolean")
          @state = :invalid
          return i
        end
        @point[:values][@key] = value
        @key = nil
        @state = :field_key
        return i+1
      when NEWLINE
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value boolean: invalid boolean")
          enter_whitespace0
          return i + 1
        end
        @point[:values][@key] = value
        @key = nil
        @state = :complete
        return i+1
      when SPACE
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value boolean: invalid boolean")
          @state = :invalid
          return i
        end
        @point[:values][@key] = value
        @key = nil
        @state = :whitespace2
        return i
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
field_value_numeric(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 389
def field_value_numeric(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when COMMA
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value numeric: invalid number")
          @state = :invalid
          return i
        end
        @point[:values][@key] = value
        @key = nil
        @state = :field_key
        return i+1
      when NEWLINE
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value numeric: invalid number")
          @state = :invalid
          return i
        end
        @point[:values][@key] = value
        @key = nil
        @state = :complete
        return i+1
      when SPACE
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value numeric: invalid number")
          @state = :invalid
          return invalid(buf, i, len)
        end
        @point[:values][@key] = value
        @key = nil
        @state = :whitespace2
        return i
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
field_value_string(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 446
def field_value_string(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when QUOTATION_MARK
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("field value string: invalid string")
          @state = :invalid
          return i
        end
        @point[:values][@key] = value
        @key = nil
        @state = :field_value_string_end
        return i+1
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
field_value_string_end(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 481
def field_value_string_end(buf, i, len)
  if i < len
    c = buf[i]
    raise "unsupported input type" unless c.is_a? Integer
    case c
    when COMMA
      @state = :field_key
      i + 1
    when NEWLINE
      @state = :complete
      i + 1
    when SPACE
      @state = :whitespace2
      i
    else
      @state = :invalid
      i
    end
  else
    len
  end
end
invalid(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 562
def invalid(buf, i, len)
  i = line_end(buf, i, len)
  if i < len
    enter_whitespace0
    i += 1
  end
  i
end
line_end(buf, i, len) click to toggle source

Starting from position i, returns the index of the newline. Returns len if no such byte is found.

# File lib/influxdb/lineprotocol/parser.rb, line 574
def line_end(buf, i, len)
  while i < len
    c = buf[i]
    raise "unsupported input type" unless c.is_a? Integer
    if c == NEWLINE
      return i
    end
    i += 1
  end
  len
end
measurement(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 145
def measurement(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when COMMA # start of tag set.
        @point = {series: decode(buf, start, i-1), tags: {}, values: {}}
        @state = :tag_key
        return i+1
      when NEWLINE
        @log.error("measurement: missing fields")
        @state = :invalid
        return i
      when SPACE # start of field set
        @point = {series: decode(buf, start, i-1), values: {}}
        @state = :whitespace1
        return i + 1
      else # part of measurement
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
string(buf) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 664
def string(buf)
  buf.pack(UTF_8_PACK_FORMAT).force_encoding(UTF_8)
end
tag_key(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 181
def tag_key(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when EQUALS
        @key = decode(buf, start, i-1)
        if @key == ""
          @log.error("tag_key: empty key")
          @state = :invalid
          return i
        end
        @state = :tag_value
        return i+1
      when NEWLINE
        @log.error("tag key: newline")
        @state = :invalid
        return i
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
tag_value(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 218
def tag_value(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when COMMA
        @point[:tags][@key] = decode(buf, start, i-1)
        @key = nil
        @state = :tag_key
        return i+1
      when NEWLINE
        @log.error("tag value: newline")
        @state = :invalid
        return i
      when SPACE
        @point[:tags][@key] = decode(buf, start, i-1)
        @key = nil
        @state = :field_key
        i, _ = whitespace(buf, i + 1, len)
        return i
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
timestamp(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 519
def timestamp(buf, i, len)
  start = i
  while i < len
    if @escaped
      @escaped = false
      i += 1
    else
      c = buf[i]
      raise "unsupported input type" unless c.is_a? Integer
      case c
      when BACKSLASH
        @escaped = true
        i += 1
      when NEWLINE
        value = decode(buf, start, i-1)
        if value.nil?
          @log.error("timestamp: invalid timestamp")
          @state = :invalid
          return i
        end
        @point[:timestamp] = value
        @state = :complete
        return i + 1
      else
        i += 1
      end
    end
  end
  if i == len && start < i
    @buf.nil? ? @buf = buf[start..i-1] : @buf += buf[start..i-1]
  end
  i
end
whitespace(buf, i, len) click to toggle source

Starting from position i, return index of the first non-whitespace byte and the byte itself. Return len and nil if no such byte is found.

# File lib/influxdb/lineprotocol/parser.rb, line 589
def whitespace(buf, i, len)
  while i < len
    c = buf[i]
    raise "unsupported input type" unless c.is_a? Integer
    if c != SPACE && c != TAB && c != NULL
      return [i, c]
    end
    i += 1
  end
  [len, nil]
end
whitespace0(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 121
def whitespace0(buf, i, len)
  # whitespace consumes TAB, SPACE, and NULL.
  # This method consumes NEWLINE, HASH, and COMMA.
  # BACKSLASH and EQUAL (of the special bytes) are valid measurement starts; they are not consumed.
  i, c = whitespace(buf, i, len)
  case c
  when nil # just whitespace
    len
  when COMMA
    @log.error "whitespace0: missing measurement"
    @state = :invalid
    i + 1
  when HASH # comment
    @state = :comment
    i + 1
  when NEWLINE
    i + 1
  else
    # don't advance i because the byte belongs to measurement
    @state = :measurement
    i
  end
end
whitespace1(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 257
def whitespace1(buf, i, len)
  i, c = whitespace(buf, i, len)
  case c
  when nil
    len
  when NEWLINE
    @log.error("whitespace1: missing fields")
    @state = :invalid
    i
  else
    @state = :field_key
    i
  end
end
whitespace2(buf, i, len) click to toggle source
# File lib/influxdb/lineprotocol/parser.rb, line 504
def whitespace2(buf, i, len)
  i, c = whitespace(buf, i, len)
  case c
  when nil
    len
  when NEWLINE
    @log.error("whitespace2: missing timestamp")
    @state = :invalid
    i
  else
    @state = :timestamp
    i
  end
end