class Searchyll::Indexer

Constants

BATCH_RESIZE_FACTOR

Grow and shrink the batch size based on how long our bulk calls take relative to the tempo

BATCH_SIZE

Initial size of document batches to send to ES _bulk API

TEMPO

Requests per minute for updates to ES

Attributes

batch_size[RW]
configuration[RW]
indexer_thread[RW]
queue[RW]
timestamp[RW]
uri[RW]
working[RW]

Public Class Methods

new(configuration) click to toggle source

Initialize a basic indexer, with a Jekyll site configuration, waiting to be supplied with documents for indexing.

# File lib/searchyll/indexer.rb, line 26
def initialize(configuration)
  self.configuration = configuration
  self.uri           = URI(configuration.elasticsearch_url)
  self.queue         = Queue.new
  self.working       = true
  self.timestamp     = Time.now
  self.batch_size    = BATCH_SIZE
end

Public Instance Methods

<<(doc) click to toggle source

Public: Add new documents for batch indexing.

# File lib/searchyll/indexer.rb, line 36
def <<(doc)
  queue << doc
end
finish() click to toggle source

Public: Indicate to the indexer that no new documents are being added.

# File lib/searchyll/indexer.rb, line 52
def finish
  self.working = false
  indexer_thread.join
  finalize!
end
start() click to toggle source

Public: start the indexer and wait for documents to index.

# File lib/searchyll/indexer.rb, line 41
def start
  prepare_index

  self.indexer_thread = Thread.new do
    http_start do |http|
      indexer_loop(http)
    end
  end
end

Private Instance Methods

current_batch() click to toggle source

Fetch a batch of documents from the queue. Returns a maximum of BATCH_SIZE documents.

# File lib/searchyll/indexer.rb, line 168
def current_batch
  count = 0
  batch = []
  while count < batch_size && !queue.empty?
    batch << queue.pop
    count += 1
  end
  batch
end
elasticsearch_index_name() click to toggle source

A versioned index name, based on the time of the indexing run. Will be later added to an alias for hot reindexing.

# File lib/searchyll/indexer.rb, line 99
def elasticsearch_index_name
  "#{configuration.elasticsearch_index_base_name}-#{timestamp.strftime('%Y%m%d%H%M%S')}"
end
es_bulk_insert!(http, batch) click to toggle source

Given a batch (array) of documents, index them into Elasticsearch using its Bulk Update API. www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

# File lib/searchyll/indexer.rb, line 158
def es_bulk_insert!(http, batch)
  bulk_insert = http_post("/#{elasticsearch_index_name}/#{configuration.elasticsearch_default_type}/_bulk")
  bulk_insert.body = batch.map do |doc|
    [{ index: {} }.to_json, doc.to_json].join("\n")
  end.join("\n") + "\n"
  http.request(bulk_insert)
end
finalize!() click to toggle source

Once documents are done being indexed, finalize the process by adding the new index into an alias for searching.

# File lib/searchyll/indexer.rb, line 191
def finalize!
  # run the prepared requests
  http_start do |http|
    finalize_refresh(http)
    finalize_replication(http)
    finalize_aliases(http)
    finalize_cleanup(http)
  end
end
finalize_aliases(http) click to toggle source

hot swap the index into the canonical alias

# File lib/searchyll/indexer.rb, line 219
def finalize_aliases(http)
  update_aliases = http_post('/_aliases')
  update_aliases.body = {
    actions: [
      { remove: {
        index: old_indices.join(','),
        alias: configuration.elasticsearch_index_base_name
      } },
      { add: {
        index: elasticsearch_index_name,
        alias: configuration.elasticsearch_index_base_name
      } }
    ]
  }.to_json
  http.request(update_aliases)
end
finalize_cleanup(http) click to toggle source

delete old indices after a successful reindexing run

# File lib/searchyll/indexer.rb, line 237
def finalize_cleanup(http)
  return if old_indices.nil? || old_indices.empty?
  cleanup_indices = http_delete("/#{old_indices.join(',')}")
  puts %(       Old indices: #{old_indices.join(', ')})
  http.request(cleanup_indices)
end
finalize_refresh(http) click to toggle source

refresh the index to make it searchable

# File lib/searchyll/indexer.rb, line 202
def finalize_refresh(http)
  refresh = http_post("/#{elasticsearch_index_name}/_refresh")
  http.request(refresh)
end
finalize_replication(http) click to toggle source

add replication to the new index

# File lib/searchyll/indexer.rb, line 208
def finalize_replication(http)
  add_replication = http_put("/#{elasticsearch_index_name}/_settings")
  add_replication.body = {
    index: {
      number_of_replicas: configuration.elasticsearch_number_of_replicas
    }
  }.to_json
  http.request(add_replication)
end
http_delete(path) click to toggle source
# File lib/searchyll/indexer.rb, line 142
def http_delete(path)
  http_request(Net::HTTP::Delete, path)
end
http_get(path) click to toggle source
# File lib/searchyll/indexer.rb, line 138
def http_get(path)
  http_request(Net::HTTP::Get, path)
end
http_post(path) click to toggle source
# File lib/searchyll/indexer.rb, line 134
def http_post(path)
  http_request(Net::HTTP::Post, path)
end
http_put(path) click to toggle source
# File lib/searchyll/indexer.rb, line 130
def http_put(path)
  http_request(Net::HTTP::Put, path)
end
http_request(klass, path) click to toggle source
# File lib/searchyll/indexer.rb, line 146
def http_request(klass, path)
  req = klass.new(path)
  req.content_type = 'application/json'
  req['Accept']    = 'application/json'
  # Append auth credentials if the exist
  req.basic_auth(uri.user, uri.password) if uri.user && uri.password
  req
end
http_start() { |http| ... } click to toggle source

Prepare an HTTP connection

# File lib/searchyll/indexer.rb, line 104
def http_start
  http = Net::HTTP.start(
    uri.hostname, uri.port,
    use_ssl: (uri.scheme == 'https')
  )
  yield(http)
end
indexer_loop(http) click to toggle source
# File lib/searchyll/indexer.rb, line 60
def indexer_loop(http)
  tempo_loop do
    break unless working?
    es_bulk_insert!(http, current_batch)
  end
end
old_indices() click to toggle source

List the indices currently in the cluster, caching the call in an ivar

# File lib/searchyll/indexer.rb, line 179
def old_indices
  # return if defined?(@old_indices)
  resp = http_start { |h| h.request(http_get('/_cat/indices?h=index')) }
  indices = JSON.parse(resp.body).map { |i| i['index'] }
  indices = indices.select { |i| i =~ /\A#{configuration.elasticsearch_index_base_name}/ }
  indices -= [elasticsearch_index_name]
  # @old_indices = indices
  indices
end
prepare_index() click to toggle source

Prepare our indexing run by creating a new index.

# File lib/searchyll/indexer.rb, line 113
def prepare_index
  create_index = http_put("/#{elasticsearch_index_name}")
  create_index.body = {
    index: {
      number_of_shards:   configuration.elasticsearch_number_of_shards,
      number_of_replicas: 0,
      refresh_interval:   -1
    }
  }.to_json # TODO: index settings

  http_start do |http|
    http.request(create_index)
  end

  # TODO: mapping?
end
tempo_loop() { || ... } click to toggle source

Run a loop in the tempo specified by TEMPO.

# File lib/searchyll/indexer.rb, line 68
def tempo_loop
  loop do
    t = Time.now

    # Perform the work required
    yield

    # Adjust the batch size
    if (Time.now - t) / (60.0 / TEMPO) < 0.5
      self.batch_size = (batch_size * BATCH_RESIZE_FACTOR).round
      puts "Increased batch to #{batch_size}"
    elsif (Time.now - t) / (60.0 / TEMPO) > 0.9
      self.batch_size = (batch_size / BATCH_RESIZE_FACTOR).round
      puts "Decreased batch to #{batch_size}"
    end

    # Tight loop to sleep through any remaining time in the tempo
    while (60.0 / TEMPO) - (Time.now - t) > 0
      sleep [0.1, (60.0 / TEMPO) - (Time.now - t)].min
      break unless working?
    end
  end
end
working?() click to toggle source

Signal a stop condition for our batch indexing thread.

# File lib/searchyll/indexer.rb, line 93
def working?
  working || !queue.empty?
end