class LogStash::Inputs::DrupalDblog

Retrieve watchdog log events from a Drupal installation with DBLog enabled. The events are pulled out directly from the database. The original events are not deleted, and on every consecutive run only new events are pulled.

The last watchdog event id that was processed is stored in the Drupal variable table with the name “logstash_last_wid”. Delete this variable or set it to 0 if you want to re-import all events.

More info on DBLog: drupal.org/documentation/modules/dblog

Public Instance Methods

config_init(params) click to toggle source
Calls superclass method LogStash::Config::Mixin#config_init
# File lib/logstash/inputs/drupal_dblog.rb, line 71
def config_init(params)
  super

  dbs = {}
  valid = true

  @databases.each do |name, rawUri|
    uri = URI(rawUri)

    dbs[name] = {
      "site" => name,
      "scheme" => uri.scheme,
      "host" => uri.host,
      "user" => uri.user,
      "password" => uri.password,
      "database" => uri.path.sub('/', ''),
      "port" => uri.port.to_i
    }

    if not (
      uri.scheme and not uri.scheme.empty?\
      and uri.host and not uri.host.empty?\
      and uri.user and not uri.user.empty?\
      and uri.password\
      and uri.path and not uri.path.sub('/', '').empty?
    )
      @logger.error("Drupal DBLog: Invalid database URI for #{name} : #{rawUri}")
      valid = false
    end
    if not uri.scheme == 'mysql'
      @logger.error("Drupal DBLog: Only mysql databases are supported.")
      valid = false
    end
  end

  if not valid
    @logger.error("Config validation failed.")
    exit 1
  end

  @databases = dbs
end
register() click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 60
def register
  require "php_serialize"

  if RUBY_PLATFORM == 'java'
    require "logstash/inputs/drupal_dblog/jdbcconnection"
  else
    require "mysql2"
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 115
def run(output_queue)
  @logger.info("Initializing drupal_dblog")

  loop do
    @logger.debug("Drupal DBLog: Starting to fetch new watchdog entries")
    start = Time.now.to_i

    @databases.each do |name, db|
      @logger.debug("Drupal DBLog: Checking database #{name}")
      check_database(output_queue, db)
      @logger.info("Drupal DBLog: Retrieved all new watchdog messages from #{name}")
    end

    timeTaken = Time.now.to_i - start
    @logger.info("Drupal DBLog: Fetched all new watchdog entries in #{timeTaken} seconds")

    # If fetching of all databases took less time than the interval,
    # sleep a bit.
    sleepTime = @interval * 60 - timeTaken
    if sleepTime > 0
      @logger.debug("Drupal DBLog: Sleeping for #{sleepTime} seconds")
      sleep(sleepTime)
    end
  end # loop
end

Private Instance Methods

build_event(row) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 279
def build_event(row)
  # Convert unix timestamp
  timestamp = Time.at(row["timestamp"]).to_datetime.iso8601

  msg = row["message"]
  vars = {}

  # Unserialize the variables, and construct the message
  if row['variables'] != 'N;'
    vars = PHP.unserialize(row["variables"])

    if vars.is_a?(Hash)
      vars.each_pair do |k, v|
        if msg.scan(k).length() > 0
          msg = msg.gsub(k.to_s, v.to_s)
        else
          # If not inside the message, add var as an additional field
          row["variable_" + k] = v
        end
      end
    end
  end

  row.delete("message")
  row.delete("variables")
  row.delete("timestamp")

  row["severity"] = row["severity"].to_i

  if @add_usernames and @usermap.has_key?(row["uid"])
    row["user"] = @usermap[row["uid"]]
  end

  entry = {
    "@timestamp" => timestamp,
    "tags" => [],
    "type" => "watchdog",
    "site" => @sitename,
    "message" => msg
  }.merge(row)

  return LogStash::Event.new(entry)
end
check_database(output_queue, db) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 170
def check_database(output_queue, db)

  begin
    # connect to the MySQL server
    initialize_client(db)
  rescue Exception => e
    @logger.error("Could not connect to database: " + e.message)
    return
  end #begin

  begin
    @sitename = db["site"]

    @usermap = @add_usernames ? get_usermap : nil

    # Retrieve last pulled watchdog entry id
    initialLastWid = get_last_wid
    lastWid = nil


    if initialLastWid == false
      lastWid = 0
      set_last_wid(0, true)
    else
      lastWid = initialLastWid
    end

    # Fetch new entries, and create the event
    while true
      results = get_db_rows(lastWid)
      if results.length() < 1
        break
      end

      @logger.debug("Fetched " + results.length().to_s + " database rows")

      results.each do |row|
        event = build_event(row)
        if event
          decorate(event)
          output_queue << event
          lastWid = row['wid'].to_s
        end
      end

      set_last_wid(lastWid, false)
    end
  rescue Exception => e
    @logger.error("Error while fetching messages: ", :error => e.message)
  end # begin

  # Close connection
  @client.close
end
get_db_rows(lastWid) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 225
def get_db_rows(lastWid)
  query = 'SELECT * from watchdog WHERE wid > ' + lastWid.to_s + " ORDER BY wid asc LIMIT " + @bulksize.to_s
  return @client.query(query)
end
get_last_wid() click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 241
def get_last_wid
  result = @client.query('SELECT value FROM variable WHERE name="logstash_last_wid"')
  lastWid = false

  if result.count() > 0
    tmp = result.first()["value"].gsub("i:", "").gsub(";", "")
    lastWid = tmp.to_i.to_s == tmp ? tmp : "0"
  end

  return lastWid
end
get_usermap() click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 267
def get_usermap
  map = {}

  @client.query("SELECT uid, name FROM users").each do |row|
    map[row["uid"]] = row["name"]
  end

  map[0] = "guest"
  return map
end
initialize_client(db) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 142
def initialize_client(db)
  if db["scheme"] == 'mysql'

    if not db["port"] > 0
      db["port"] = 3306
    end

    if RUBY_PLATFORM == 'java'
      @client = LogStash::DrupalDblogJavaMysqlConnection.new(
          db["host"],
          db["user"],
          db["password"],
          db["database"],
          db["port"]
      )
    else
      @client = Mysql2::Client.new(
          :host => db["host"],
          :port => db["port"],
          :username => db["user"],
          :password => db["password"],
          :database => db["database"]
      )
    end
  end
end
set_last_wid(wid, insert) click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 254
def set_last_wid(wid, insert)
  wid = PHP.serialize(wid.to_i)

  # Update last import wid variable
  if insert
    # Does not exist yet, so insert
    @client.query('INSERT INTO variable (name, value) VALUES("logstash_last_wid", "' + wid + '")')
  else
    @client.query('UPDATE variable SET value="' + wid + '" WHERE name="logstash_last_wid"')
  end
end
update_sitename() click to toggle source
# File lib/logstash/inputs/drupal_dblog.rb, line 231
def update_sitename
  if @sitename == ""
    result = @client.query('SELECT value FROM variable WHERE name="site_name"')
    if result.first()
      @sitename = PHP.unserialize(result.first()['value'])
    end
  end
end