class LogStash::Filters::Elasticsearch
Attributes
Public Class Methods
Source
# File lib/logstash/filters/elasticsearch.rb, line 155 def self.validate_value(value, validator) return super unless validator == :uri_or_empty value = deep_replace(value) value = hash_or_array(value) return true, value.first if value.size == 1 && value.first.empty? return super(value, :uri) end
@override to handle proxy => ” as if none was set @param value [Array<Object>] @param validator [nil,Array,Symbol] @return [Array(true,Object)]: if validation is a success, a tuple containing ‘true` and the coerced value @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.
Public Instance Methods
Source
# File lib/logstash/filters/elasticsearch.rb, line 191 def filter(event) matched = false begin params = { :index => event.sprintf(@index) } if @query_dsl query = LogStash::Json.load(event.sprintf(@query_dsl)) params[:body] = query else query = event.sprintf(@query) params[:q] = query params[:size] = result_size params[:sort] = @sort if @enable_sort end @logger.debug("Querying elasticsearch for lookup", :params => params) results = get_client.search(params) raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures" event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits'])) resultsHits = results["hits"]["hits"] if !resultsHits.nil? && !resultsHits.empty? matched = true @fields.each do |old_key, new_key| old_key_path = extract_path(old_key) extracted_hit_values = resultsHits.map do |doc| extract_value(doc["_source"], old_key_path) end value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first set_to_event_target(event, new_key, value_to_set) end @docinfo_fields.each do |old_key, new_key| old_key_path = extract_path(old_key) extracted_docs_info = resultsHits.map do |doc| extract_value(doc, old_key_path) end value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first set_to_event_target(event, new_key, value_to_set) end end resultsAggs = results["aggregations"] if !resultsAggs.nil? && !resultsAggs.empty? matched = true @aggregation_fields.each do |agg_name, ls_field| set_to_event_target(event, ls_field, resultsAggs[agg_name]) end end rescue => e if @logger.trace? @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace) elsif @logger.debug? @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace) else @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message) end @tag_on_failure.each{|tag| event.tag(tag)} else filter_matched(event) if matched end end
Source
# File lib/logstash/filters/elasticsearch.rb, line 257 def prepare_user_agent os_name = java.lang.System.getProperty('os.name') os_version = java.lang.System.getProperty('os.version') os_arch = java.lang.System.getProperty('os.arch') jvm_vendor = java.lang.System.getProperty('java.vendor') jvm_version = java.lang.System.getProperty('java.version') plugin_version = Gem.loaded_specs['logstash-filter-elasticsearch'].version # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-output-elasticsearch/11.0.1 "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}" end
public only to be reuse in testing
Source
# File lib/logstash/filters/elasticsearch.rb, line 166 def register #Load query if it exists if @query_template if File.zero?(@query_template) raise "template is empty" end file = File.open(@query_template, 'r') @query_dsl = file.read end validate_query_settings fill_hosts_from_cloud_id setup_ssl_params! validate_authentication fill_user_password_from_cloud_auth @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s test_connection! setup_serverless if get_client.es_transport_client_type == "elasticsearch_transport" require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" end end
Private Instance Methods
Source
# File lib/logstash/filters/elasticsearch.rb, line 283 def client_options @client_options ||= { :user => @user, :password => @password, :api_key => @api_key, :proxy => @proxy, :ssl => client_ssl_options, :retry_on_failure => @retry_on_failure, :retry_on_status => @retry_on_status, :user_agent => prepare_user_agent, :custom_headers => @custom_headers } end
Source
# File lib/logstash/filters/elasticsearch.rb, line 297 def client_ssl_options ssl_options = {} ssl_options[:enabled] = @ssl_enabled # If the deprecated `ssl` option was explicitly provided, it keeps the same behavior # setting up all the client SSL configs even if ssl => false. Otherwise, it should use # the @ssl_enabled value as it was either explicitly set by the `ssl_enabled` option or # inferred from the hosts scheme. return ssl_options unless @ssl_enabled || original_params.include?('ssl') ssl_options[:enabled] = true ssl_certificate_authorities, ssl_truststore_path, ssl_certificate, ssl_keystore_path = params.values_at('ssl_certificate_authorities', 'ssl_truststore_path', 'ssl_certificate', 'ssl_keystore_path') if ssl_certificate_authorities && ssl_truststore_path raise LogStash::ConfigurationError, 'Use either "ssl_certificate_authorities/ca_file" or "ssl_truststore_path" when configuring the CA certificate' end if ssl_certificate && ssl_keystore_path raise LogStash::ConfigurationError, 'Use either "ssl_certificate" or "ssl_keystore_path/keystore" when configuring client certificates' end if ssl_certificate_authorities&.any? raise LogStash::ConfigurationError, 'Multiple values on "ssl_certificate_authorities" are not supported by this plugin' if ssl_certificate_authorities.size > 1 ssl_options[:ca_file] = ssl_certificate_authorities.first end setup_client_ssl_store(ssl_options, 'truststore', ssl_truststore_path) setup_client_ssl_store(ssl_options, 'keystore', ssl_keystore_path) logger.debug("Keystore for client certificate", :keystore => ssl_keystore_path) if ssl_keystore_path ssl_key = params["ssl_key"] if ssl_certificate raise LogStash::ConfigurationError, 'Using an "ssl_certificate" requires an "ssl_key"' unless ssl_key ssl_options[:client_cert] = ssl_certificate ssl_options[:client_key] = ssl_key elsif !ssl_key.nil? raise LogStash::ConfigurationError, 'An "ssl_certificate" is required when using an "ssl_key"' end ssl_verification_mode = params["ssl_verification_mode"] unless ssl_verification_mode.nil? case ssl_verification_mode when 'none' logger.warn "You have enabled encryption but DISABLED certificate verification, " + "to make sure your data is secure set `ssl_verification_mode => full`" ssl_options[:verify] = :disable else # Manticore's :default maps to Apache HTTP Client's DefaultHostnameVerifier, # which is the modern STRICT verifier that replaces the deprecated StrictHostnameVerifier ssl_options[:verify] = :default end end ssl_options[:cipher_suites] = params["ssl_cipher_suites"] if params.include?("ssl_cipher_suites") protocols = params['ssl_supported_protocols'] ssl_options[:protocols] = protocols if protocols&.any? ssl_options[:trust_strategy] = trust_strategy_for_ca_trusted_fingerprint ssl_options end
Source
# File lib/logstash/filters/elasticsearch.rb, line 380 def extract_path(path_reference) return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']') path_reference[1...-1].split('][') end
get an array of path elements from a path reference
Source
# File lib/logstash/filters/elasticsearch.rb, line 401 def extract_total_from_hits(hits) total = hits['total'] # Elasticsearch 7.x produces an object containing `value` and `relation` in order # to enable unambiguous reporting when the total is only a lower bound; if we get # an object back, return its `value`. return total['value'] if total.kind_of?(Hash) total end
Given a “hits” object from an Elasticsearch
response, return the total number of hits in the result set. @param hits [Hash{String=>Object}] @return [Integer]
Source
# File lib/logstash/filters/elasticsearch.rb, line 390 def extract_value(source, path) path.reduce(source) do |memo, old_key_fragment| break unless memo.include?(old_key_fragment) memo[old_key_fragment] end end
given a Hash and an array of path fragments, returns the value at the path @param source [Hash{String=>Object}] @param path [Array{String}] @return [Object]
Source
# File lib/logstash/filters/elasticsearch.rb, line 448 def fill_hosts_from_cloud_id return unless @cloud_id if @hosts && !hosts_default?(@hosts) raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.' end @hosts = parse_host_uri_from_cloud_id(@cloud_id) end
Source
# File lib/logstash/filters/elasticsearch.rb, line 441 def fill_user_password_from_cloud_auth return unless @cloud_auth @user, @password = parse_user_password_from_cloud_auth(@cloud_auth) params['user'], params['password'] = @user, @password end
Source
# File lib/logstash/filters/elasticsearch.rb, line 373 def get_client @shared_client || synchronize do @shared_client ||= new_client end end
Source
# File lib/logstash/filters/elasticsearch.rb, line 412 def hosts_default?(hosts) hosts.is_a?(Array) && hosts.size == 1 && !original_params.key?('hosts') end
Source
# File lib/logstash/filters/elasticsearch.rb, line 367 def new_client # NOTE: could pass cloud-id/cloud-auth to client but than we would need to be stricter on ES version requirement # and also LS parsing might differ from ES client's parsing so for consistency we do not pass cloud options ... LogStash::Filters::ElasticsearchClient.new(@logger, @hosts, client_options) end
Source
# File lib/logstash/filters/elasticsearch.rb, line 457 def parse_host_uri_from_cloud_id(cloud_id) require 'logstash/util/safe_uri' begin # might not be available on older LS require 'logstash/util/cloud_setting_id' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set hosts instead).' end begin cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id') end cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}" LogStash::Util::SafeURI.new(cloud_uri) end
Source
# File lib/logstash/filters/elasticsearch.rb, line 475 def parse_user_password_from_cloud_auth(cloud_auth) begin # might not be available on older LS require 'logstash/util/cloud_setting_auth' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set user/password instead).' end cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password) begin cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth) rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') end [ cloud_auth.username, cloud_auth.password ] end
Source
# File lib/logstash/filters/elasticsearch.rb, line 277 def set_to_event_target(event, new_key, value_to_set) key_to_set = target ? "[#{target}][#{new_key}]" : new_key event.set(key_to_set, value_to_set) end
if @target is defined, creates a nested structure to inject result into target field if not defined, directly sets to the top-level event field @param event [LogStash::Event] @param new_key [String] name of the field to set @param value_to_set [Array] values to set @return [void]
Source
# File lib/logstash/filters/elasticsearch.rb, line 359 def setup_client_ssl_store(ssl_options, kind, store_path) if store_path ssl_options[kind.to_sym] = store_path ssl_options["#{kind}_type".to_sym] = params["ssl_#{kind}_type"] if params.include?("ssl_#{kind}_type") ssl_options["#{kind}_password".to_sym] = params["ssl_#{kind}_password"].value if params.include?("ssl_#{kind}_password") end end
@param kind is a string [truststore|keystore]
Source
# File lib/logstash/filters/elasticsearch.rb, line 500 def setup_serverless if get_client.serverless? @client_options[:serverless] = true @shared_client = new_client get_client.info end rescue => e @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace) raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end
Source
# File lib/logstash/filters/elasticsearch.rb, line 511 def setup_ssl_params! # Infer the value if neither `ssl_enabled` was not set return if original_params.include?('ssl_enabled') params['ssl_enabled'] = @ssl_enabled ||= Array(@hosts).all? { |host| host && host.to_s.start_with?("https") } end
Source
# File lib/logstash/filters/elasticsearch.rb, line 492 def test_connection! begin get_client.client.ping rescue Elasticsearch::UnsupportedProductError raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end end
Source
# File lib/logstash/filters/elasticsearch.rb, line 426 def validate_authentication authn_options = 0 authn_options += 1 if @cloud_auth authn_options += 1 if (@api_key && @api_key.value) authn_options += 1 if (@user || (@password && @password.value)) if authn_options > 1 raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key' end if @api_key && @api_key.value && @ssl_enabled != true raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option") end end
Source
# File lib/logstash/filters/elasticsearch.rb, line 416 def validate_query_settings unless @query || @query_template raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." end if @query && @query_template raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." end end