class Stashquery::Query
Constants
- DEFAULT_FIELD
Attributes
end_date[R]
num_results[R]
query_finished[R]
start_date[R]
Public Class Methods
new(conf = {})
click to toggle source
# File lib/stash-query/query.rb, line 24 def initialize(conf = {}) @config = {} @config[:host] = conf[:host] || "ls2-es-lb.int.tropo.com" @config[:port] = conf[:port] || "9200" if conf[:index_prefixes].is_a? Array and ! conf[:index_prefixes].empty? @config[:index_prefixes] = conf[:index_prefixes] else @config[:index_prefixes] = [ "logstash-" ] end @config[:scroll_size] = conf[:scroll_size] || "100" @config[:scroll_time] = conf[:scroll_time] || "30m" @config[:output] = conf[:output_file] || nil @query = conf[:query] || nil @tags = conf[:tags] || nil @start_date = conf[:start_date] @end_date = conf[:end_date] @config[:write_fields] = [] set_write_fields(conf[:write_fields]) @config[:delimiter] = conf[:delimiter] || ',' @num_results = 0 @query_finished = false @scroll_ids = Array.new if conf[:do_print] @config[:print] = true require 'progress_bar' end ## Do this better unless Query.validate_date(@start_date) and Query.validate_date(@end_date) raise "Improper date format entered" end ## Cleanup output file. Probably a better way to do this. unless @config[:output].nil? begin File.truncate(@config[:output],0) rescue end end @es_conn = connect_to_es run_query sort_file end
validate_date(str)
click to toggle source
# File lib/stash-query/query.rb, line 70 def self.validate_date(str) return true if str =~ /20[0-9]{2}-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])T[012][0-9]:[0-5][0-9]:[0-5][0-9]\.[0-9]{3}Z/ return nil end
Private Instance Methods
clean_scroll_ids()
click to toggle source
# File lib/stash-query/query.rb, line 260 def clean_scroll_ids ## Delete the scroll_ids to free up resources on the ES cluster ## Have to use direct API call until elasticsearch-ruby supports this @scroll_ids.uniq.each do |scroll| puts "DELETE SCROLL:#{scroll}" if $debug #puts begin Curl.delete("#{@config[:host]}:#{@config[:port]}/_search/scroll/#{scroll}") rescue puts "Delete failed" if $debug end end end
connect_to_es()
click to toggle source
# File lib/stash-query/query.rb, line 134 def connect_to_es ## Try a different transporter if $new_transport require 'typhoeus' require 'typhoeus/adapters/faraday' transport_conf = lambda do |f| #f.response :logger f.adapter :typhoeus end end ## Connect to ES server begin if $new_transport transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new hosts: [ { host: @config[:host], port: @config[:port] }], &transport_conf es = Elasticsearch::Client.new transport: transport else es = Elasticsearch::Client.new(:host => @config[:host], :port => @config[:port]) end rescue raise "Could not connect to Elasticsearch cluster: #{@config[:host]}:#{@config[:port]}" end return es end
flush_to_file(hit_list)
click to toggle source
# File lib/stash-query/query.rb, line 86 def flush_to_file(hit_list) return if @config[:output].nil? if hit_list.is_a? String hit_list = hit_list.split("\n") end File.open(@config[:output], 'a') do |file| begin file.puts(generate_output(hit_list)) rescue => e puts "Error writing to file." raise e exit end end end
generate_output(hit_list)
click to toggle source
# File lib/stash-query/query.rb, line 116 def generate_output(hit_list) output_data = [] hit_list.each do |event| event_data = [] if @config[:write_fields].include?('_all') event['_source'].keys.each do |field| event_data << "#{event['_source'][field]}".gsub("\n", '') end else @config[:write_fields].each do |field| event_data << "#{event['_source'][field] if event['_source'][field]}".gsub("\n", '') end end output_data << event_data.join(@config[:delimiter]) end output_data end
get_indices()
click to toggle source
# File lib/stash-query/query.rb, line 161 def get_indices indexes = Array.new start_str = @start_date.split('T').first.split('-').join('.') s_year = start_str.split('.').first.to_i s_mo = start_str.split('.')[1].to_i s_day = start_str.split('.').last.to_i start_date = Date.new(s_year, s_mo, s_day) end_str = @end_date.split('T').first.split('-').join('.') e_year = end_str.split('.').first.to_i e_mo = end_str.split('.')[1].to_i e_day = end_str.split('.').last.to_i end_date = Date.new(e_year, e_mo, e_day) (start_date..end_date).map do |day| day = day.strftime('%Y.%m.%d') @config[:index_prefixes].each do |prefix| indexes << "#{prefix}#{day}" end end return indexes end
run_query()
click to toggle source
# File lib/stash-query/query.rb, line 184 def run_query queries = Array.new queries << @query if @query queries << @tags if @tags if @start_date and @end_date time_range = "@timestamp:[#{@start_date} TO #{@end_date}]" queries << "#{time_range}" indexes = get_indices else indexes [ '_all' ] end query = queries.join(' AND ') ## Make sure each index exists good_indexes = Array.new unless indexes.include?('_all') indexes.each do |index| good_indexes << index if @es_conn.indices.exists index: index end indexes = good_indexes else indexes = [ '_all' ] end puts "Using these indices: #{indexes.join(',')}" if $debug index_str = indexes.join(',') res = @es_conn.search index: index_str, q: query, search_type: 'scan', scroll: @config[:scroll_time], size: @config[:scroll_size], df: 'message' scroll_id = res['_scroll_id'] @scroll_ids << res['_scroll_id'] @num_results = res['hits']['total'] puts "Found #{@num_results} results" if @config[:print] puts res.inspect if $debug if @config[:output] bar = ProgressBar.new(@num_results) if @config[:print] hit_list = Array.new total_lines = 0 if $debug while true res['hits']['hits'].each do |hit| bar.increment! if @config[:print] hit_list << hit if hit_list.length % $flush_buffer == 0 flush_to_file hit_list hit_list = Array.new end end total_lines += res['hits']['hits'].length if $debug # Continue scroll through data begin res = @es_conn.scroll scroll: @config[:scroll_time], body: scroll_id scroll_id = res['_scroll_id'] @scroll_ids << res['_scroll_id'] rescue => e puts res.inspect raise e end begin break if res['hits']['hits'].length < 1 rescue => e raise e end end flush_to_file hit_list end @query_finished = true clean_scroll_ids end
set_write_fields(fields)
click to toggle source
# File lib/stash-query/query.rb, line 102 def set_write_fields(fields) if fields.is_a? Array if fields.empty? @config[:write_fields] << DEFAULT_FIELD else @config[:write_fields] = fields end elsif fields.is_a? String @config[:write_fields] = [ fields ] else @config[:write_fields] = [ DEFAULT_FIELD ] end end
sort_file()
click to toggle source
# File lib/stash-query/query.rb, line 77 def sort_file unless @config[:output].nil? arr = File.readlines(@config[:output]).sort File.open(@config[:output], 'w') do |f| f.puts arr end end end