class LogStash::Inputs::Sqlite
Read rows from an sqlite database.
This is most useful in cases where you are logging directly to a table. Any tables being watched must have an 'id' column that is monotonically increasing.
All tables are read by default except:
-
ones matching 'sqlite_%' - these are internal/adminstrative tables for sqlite
-
'since_table' - this is used by this plugin to track state.
## Example
% sqlite /tmp/example.db sqlite> CREATE TABLE weblogs ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip STRING, request STRING, response INTEGER); sqlite> INSERT INTO weblogs (ip, request, response) VALUES ("1.2.3.4", "/index.html", 200);
Then with this logstash config:
input { sqlite { path => "/tmp/example.db" type => weblogs } } output { stdout { debug => true } }
Sample output:
{ "@source" => "sqlite://sadness/tmp/x.db", "@tags" => [], "@fields" => { "ip" => "1.2.3.4", "request" => "/index.html", "response" => 200 }, "@timestamp" => "2013-05-29T06:16:30.850Z", "@source_host" => "sadness", "@source_path" => "/tmp/x.db", "@message" => "", "@type" => "foo" }
Constants
- SINCE_TABLE
Public Instance Methods
get_all_tables(db)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 114 def get_all_tables(db) return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) } end
get_n_rows_from_table(db, table, offset, limit)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 119 def get_n_rows_from_table(db, table, offset, limit) dataset = db["SELECT * FROM #{table}"] return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row } end
get_placeholder(db, table)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 87 def get_placeholder(db, table) since = db[SINCE_TABLE] x = since.where(:table => "#{table}") if x[:place].nil? init_placeholder(db, table) return 0 else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] end end
init_placeholder(db, table)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 100 def init_placeholder(db, table) @logger.debug("init placeholder for #{table}") since = db[SINCE_TABLE] since.insert(:table => table, :place => 0) end
init_placeholder_table(db)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 75 def init_placeholder_table(db) begin db.create_table SINCE_TABLE do String :table Int :place end rescue @logger.debug("since tables already exists") end end
register()
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 125 def register require "sequel" require "jdbc/sqlite3" @host = Socket.gethostname @logger.info("Registering sqlite input", :database => @path) @db = Sequel.connect("jdbc:sqlite:#{@path}") @tables = get_all_tables(@db) @table_data = {} @tables.each do |table| init_placeholder_table(@db) last_place = get_placeholder(@db, table) @table_data[table] = { :name => table, :place => last_place } end end
run(queue)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 141 def run(queue) sleep_min = 0.01 sleep_max = 5 sleeptime = sleep_min begin @logger.debug("Tailing sqlite db", :path => @path) loop do count = 0 @table_data.each do |k, table| table_name = table[:name] offset = table[:place] @logger.debug("offset is #{offset}", :k => k, :table => table_name) rows = get_n_rows_from_table(@db, table_name, offset, @batch) count += rows.count rows.each do |row| event = LogStash::Event.new("host" => @host, "db" => @db) decorate(event) # store each column as a field in the event. row.each do |column, element| next if column == :id event[column.to_s] = element end queue << event @table_data[k][:place] = row[:id] end # Store the last-seen row in the database update_placeholder(@db, table_name, @table_data[k][:place]) end if count == 0 # nothing found in that iteration # sleep a bit @logger.debug("No new rows. Sleeping.", :time => sleeptime) sleeptime = [sleeptime * 2, sleep_max].min sleep(sleeptime) else sleeptime = sleep_min end end # loop end # begin/rescue end
update_placeholder(db, table, place)
click to toggle source
# File lib/logstash/inputs/sqlite.rb, line 107 def update_placeholder(db, table, place) @logger.debug("set placeholder to #{place}") since = db[SINCE_TABLE] since.where(:table => table).update(:place => place) end