class Elasticsearch::Model::TransactionalCallbacks::BulkIndexingJob

Background job which handles the request to index/update/delete documents asynchronously

Elasticsearch::Model::TransactionalCallbacks::BulkIndexingJob.perform_later(
  document_type: {
    index: [{ _id: document.id }],
    update: [{ _id: document.id }],
    delete: [{ _id: document.id }],
  }
)

Public Instance Methods

perform(indexables) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 20
def perform(indexables)
  indexables.each do |document_type, action_map|
    klass = document_type.to_s.camelcase.constantize
    body = transform_batches(klass, action_map)

    response = bulk_index klass, body

    ::Rails.logger.error "[ELASTICSEARCH] Bulk request failed: #{response['items']}" if response&.dig('errors')
  end
end

Private Instance Methods

build_reverse_map(action_map) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 46
def build_reverse_map(action_map)
  action_map.each_with_object({}) { |map, memo|
    action, options = map

    next if action == :delete

    options.each do |option|
      memo[option[:_id]] = [action, option]
    end
  }
end
bulk_index(klass, body) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 84
def bulk_index(klass, body)
  return if body.blank?

  klass.__elasticsearch__.client.bulk(
    index: klass.index_name,
    type: klass.document_type,
    body: body
  )
end
preload(resources) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 58
def preload(resources)
  resources.respond_to?(:preload_for_import) ? resources.preload_for_import : resources
end
to_indexed_json(resource) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 78
def to_indexed_json(resource)
  return resource.as_indexed_json if resource.respond_to?(:as_indexed_json)

  resource.__elasticsearch__.as_indexed_json
end
transform_batches(klass, action_map) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 33
def transform_batches(klass, action_map)
  reverse_map = build_reverse_map(action_map)
  resources = klass.where id: reverse_map.keys

  preload(resources).find_each.map { |resource|
    action, option = reverse_map[resource.id]

    send "transform_#{action}", resource, option
  } + action_map.fetch(:delete, []).map { |option|
    transform_delete(option)
  }
end
transform_delete(option) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 74
def transform_delete(option)
  { delete: option }
end
transform_index(resource, option) click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 62
def transform_index(resource, option)
  { index: option.merge(data: to_indexed_json(resource)) }
end
Also aliased as: transform_update
transform_update(resource, option)

elasticsearch do support update operation in their bulk API, but it will fail in case the update is done to missing documents, while index work for both new and existing document

because of this, we choose to use index for update to avoid issue with race condition where a document is updated immediately after it is created, on which elasticsearch might not be aware of the document yet

Alias for: transform_index