class LogStash::Inputs::Sqlite

Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table. Any tables being watched must have an 'id' column that is monotonically increasing.

All tables are read by default except:

## Example

% sqlite /tmp/example.db
sqlite> CREATE TABLE weblogs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ip STRING,
    request STRING,
    response INTEGER);
sqlite> INSERT INTO weblogs (ip, request, response) 
    VALUES ("1.2.3.4", "/index.html", 200);

Then with this logstash config:

input {
  sqlite {
    path => "/tmp/example.db"
    type => weblogs
  }
}
output {
  stdout {
    debug => true
  }
}

Sample output:

{
  "@source"      => "sqlite://sadness/tmp/x.db",
  "@tags"        => [],
  "@fields"      => {
    "ip"       => "1.2.3.4",
    "request"  => "/index.html",
    "response" => 200
  },
  "@timestamp"   => "2013-05-29T06:16:30.850Z",
  "@source_host" => "sadness",
  "@source_path" => "/tmp/x.db",
  "@message"     => "",
  "@type"        => "foo"
}

Constants

SINCE_TABLE

Public Instance Methods

get_all_tables(db) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 114
def get_all_tables(db)
  return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end
get_n_rows_from_table(db, table, offset, limit) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 119
def get_n_rows_from_table(db, table, offset, limit)
  dataset = db["SELECT * FROM #{table}"]
  return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end
get_placeholder(db, table) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 87
def get_placeholder(db, table)
  since = db[SINCE_TABLE]
  x = since.where(:table => "#{table}")
  if x[:place].nil?
    init_placeholder(db, table) 
    return 0
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end
init_placeholder(db, table) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 100
def init_placeholder(db, table)
  @logger.debug("init placeholder for #{table}")
  since = db[SINCE_TABLE]
  since.insert(:table => table, :place => 0)
end
init_placeholder_table(db) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 75
def init_placeholder_table(db)
  begin
    db.create_table SINCE_TABLE do 
      String :table
      Int    :place
    end
  rescue
    @logger.debug("since tables already exists")
  end
end
register() click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 125
def register
  require "sequel"
  require "jdbc/sqlite3" 
  @host = Socket.gethostname
  @logger.info("Registering sqlite input", :database => @path)
  @db = Sequel.connect("jdbc:sqlite:#{@path}") 
  @tables = get_all_tables(@db)
  @table_data = {}
  @tables.each do |table|
    init_placeholder_table(@db)
    last_place = get_placeholder(@db, table)
    @table_data[table] = { :name => table, :place => last_place }
  end
end
run(queue) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 141
def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing sqlite db", :path => @path)
    loop do
      count = 0
      @table_data.each do |k, table|
        table_name = table[:name]
        offset = table[:place]
        @logger.debug("offset is #{offset}", :k => k, :table => table_name)
        rows = get_n_rows_from_table(@db, table_name, offset, @batch)
        count += rows.count
        rows.each do |row| 
          event = LogStash::Event.new("host" => @host, "db" => @db)
          decorate(event)
          # store each column as a field in the event.
          row.each do |column, element|
            next if column == :id
            event[column.to_s] = element
          end
          queue << event
          @table_data[k][:place] = row[:id]
        end
        # Store the last-seen row in the database
        update_placeholder(@db, table_name, @table_data[k][:place])
      end

      if count == 0
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)
        sleeptime = [sleeptime * 2, sleep_max].min
        sleep(sleeptime)
      else
        sleeptime = sleep_min
      end
    end # loop
  end # begin/rescue
end
update_placeholder(db, table, place) click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 107
def update_placeholder(db, table, place)
  @logger.debug("set placeholder to #{place}")
  since = db[SINCE_TABLE]
  since.where(:table => table).update(:place => place)
end