class Ej::Core

Public Class Methods

new(values) click to toggle source
# File lib/ej/core.rb, line 13
def initialize(values)
  @logger =  values.logger
  @index = values.index
  @client = values.client
end

Public Instance Methods

aggs(terms, size, query) click to toggle source
# File lib/ej/core.rb, line 81
def aggs(terms, size, query)
  body = {
    "size"=>0,
    "query"=>{
      "query_string"=>{
        "query"=>query
      }
    }
  }

  agg_terms = []
  code = %Q{['aggs']}
  terms.each_with_index do |term, i|
    term_name = "agg_#{term}"
    aggs_body = {
      term_name=>{
        "terms"=>{
          "field"=>term,
          "size"=>size,
          "order"=>{
            "_count"=>"desc"
          }
        }
      }
    }

    eval(%Q{body#{code} = aggs_body})
    code += %Q{['#{term_name}']['aggs']}
  end

  @client.search index: @index, body: body
end
bulk(timestamp_key, type, add_timestamp, id_keys, index, data) click to toggle source
# File lib/ej/core.rb, line 132
def bulk(timestamp_key, type, add_timestamp, id_keys, index, data)
  template = id_keys.map { |key| '%s' }.join('_') unless id_keys.nil?
  bulk_message = []
  data.each do |record|
    if timestamp_key.nil?
      timestamp = Time.now.to_datetime.to_s
    else
      timestamp = record[timestamp_key].to_time.to_datetime.to_s
    end
    record.merge!('@timestamp' => timestamp) if add_timestamp
    meta = { index: { _index: index, _type: type } }
    meta[:index][:_id] = Util.generate_id(template, record, id_keys) unless id_keys.nil?
    bulk_message << meta
    bulk_message << record
  end
  connect_with_retry { @client.bulk body: bulk_message unless bulk_message.empty? }
end
bulk_results(results, dest_client, before_size, total, dest_index, slice_id) click to toggle source
# File lib/ej/core.rb, line 171
def bulk_results(results, dest_client, before_size, total, dest_index, slice_id)
  bulk_message = convert_results(results, dest_index)
  connect_with_retry do
    dest_client.bulk body: bulk_message unless bulk_message.empty?
    to_size = before_size + (bulk_message.size/2)
    @logger.info "slice_id[#{slice_id}] copy complete (#{before_size}-#{to_size})/#{total}"
  end
  return (bulk_message.size/2)
end
connect_with_retry(retry_on_failure = 5) { || ... } click to toggle source
# File lib/ej/core.rb, line 206
def connect_with_retry(retry_on_failure = 5)
  retries = 0
  begin
    yield if block_given?
  rescue => e
    if retries < retry_on_failure
      retries += 1
      @logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}"
      sleep 10**retries
      retry
    end
    raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}"
  end
end
convert_results(search_results, dest_index) click to toggle source
# File lib/ej/core.rb, line 189
def convert_results(search_results, dest_index)
  data = HashWrapper.new(search_results)
  docs = data.hits.hits
  bulk_message = []
  docs.each do |doc|
    source = doc.delete('_source')
    doc.delete('_score')
    ['_id', '_type', '_index'].each do |meta_field|
      source.delete(meta_field)
    end
    doc._index = dest_index if dest_index 
    bulk_message << { index: doc.to_h }
    bulk_message << source
  end
  bulk_message
end
copy(source, dest, query, per_size, scroll, dest_index, slice_max) click to toggle source
# File lib/ej/core.rb, line 150
  def copy(source, dest, query, per_size, scroll, dest_index, slice_max)
    source_client = Elasticsearch::Client.new transport: Util.get_transport(Util.parse_hosts(source))
    dest_client = Elasticsearch::Client.new transport: Util.get_transport(Util.parse_hosts(dest))

    parallel_array = slice_max ? slice_max.times.to_a : [0]
    Parallel.map(parallel_array, :in_processes=>parallel_array.size) do |slice_id|
      scroll_option = get_scroll_option(@index, query, per_size, scroll, slice_id, slice_max)
      r = connect_with_retry { source_client.search(scroll_option) }
      total = r['hits']['total']
      i = 0
      i += bulk_results(r, dest_client, i, total, dest_index, slice_id)

      while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
        (not r['hits']['hits'].empty?) do
        i += bulk_results(r, dest_client, i, total, dest_index, slice_id)
      end
    end
  end

  private

  def bulk_results(results, dest_client, before_size, total, dest_index, slice_id)
    bulk_message = convert_results(results, dest_index)
    connect_with_retry do
      dest_client.bulk body: bulk_message unless bulk_message.empty?
      to_size = before_size + (bulk_message.size/2)
      @logger.info "slice_id[#{slice_id}] copy complete (#{before_size}-#{to_size})/#{total}"
    end
    return (bulk_message.size/2)
  end

  def get_scroll_option(index, query, size, scroll, slice_id, slice_max)
    body = {}
    body[:query] = { query_string: { query: query } } unless query.nil?
    body[:slice] = { id: slice_id, max: slice_max } if slice_max
    search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) }
    search_option
  end

  def convert_results(search_results, dest_index)
    data = HashWrapper.new(search_results)
    docs = data.hits.hits
    bulk_message = []
    docs.each do |doc|
      source = doc.delete('_source')
      doc.delete('_score')
      ['_id', '_type', '_index'].each do |meta_field|
        source.delete(meta_field)
      end
      doc._index = dest_index if dest_index 
      bulk_message << { index: doc.to_h }
      bulk_message << source
    end
    bulk_message
  end

  def connect_with_retry(retry_on_failure = 5)
    retries = 0
    begin
      yield if block_given?
    rescue => e
      if retries < retry_on_failure
        retries += 1
        @logger.warn "Could not connect to Elasticsearch, resetting connection and trying again. #{e.message}"
        sleep 10**retries
        retry
      end
      raise "Could not connect to Elasticsearch after #{retries} retries. #{e.message}"
    end
  end
end
distinct(term, type, query) click to toggle source
# File lib/ej/core.rb, line 37
def distinct(term, type, query)
  body = { size: 0, "aggs"=>{ term + "_count"=>{"cardinality"=>{"field"=>term}}}}
  body[:query] = { query_string: { query: query } } unless query.nil?
  @client.search index: @index, type: type, body: body
end
dump(query, per_size) click to toggle source
# File lib/ej/core.rb, line 43
def dump(query, per_size)
  per = per_size || DEFAULT_PER
  num = 0
  while true
    bulk_message = []
    from = num * per
    body = { size: per, from: from }
    body[:query] = { query_string: { query: query } } unless query.nil?
    data = HashWrapper.new(@client.search index: @index, body: body)
    docs = data.hits.hits
    break if docs.empty?
    docs.each do |doc|
      source = doc.delete('_source')
      doc.delete('_score')
      bulk_message << JSON.dump({ 'index' => doc.to_h })
      bulk_message << JSON.dump(source)
    end
    num += 1
    puts bulk_message.join("\n")
  end
end
facet(term, size, query) click to toggle source
# File lib/ej/core.rb, line 65
def facet(term, size, query)
  body = {"facets"=>
    {"terms"=>
      {"terms"=>{"field"=>term, "size"=>size, "order"=>"count", "exclude"=>[]},
       "facet_filter"=>
        {"fquery"=>
          {"query"=>
            {"filtered"=>
              {"query"=>
                {"bool"=>
                  {"should"=>[{"query_string"=>{"query"=>query}}]}},
               "filter"=>{"bool"=>{"must"=>[{"match_all"=>{}}]}}}}}}}},
   "size"=>0}
  @client.search index: @index, body: body
end
get_scroll_option(index, query, size, scroll, slice_id, slice_max) click to toggle source
# File lib/ej/core.rb, line 181
def get_scroll_option(index, query, size, scroll, slice_id, slice_max)
  body = {}
  body[:query] = { query_string: { query: query } } unless query.nil?
  body[:slice] = { id: slice_id, max: slice_max } if slice_max
  search_option = { index: index, scroll: scroll, body: body, size: (size || DEFAULT_PER) }
  search_option
end
max(term) click to toggle source
# File lib/ej/core.rb, line 123
def max(term)
  body = {
    aggs: {
      "max_#{term}" => { max: { field: term } }
    }
  }
  @client.search index: @index, body: body, size: 0
end
min(term) click to toggle source
# File lib/ej/core.rb, line 114
def min(term)
  body = {
    aggs: {
      "min_#{term}" => { min: { field: term } }
    }
  }
  @client.search index: @index, body: body, size: 0
end