class Elasticsearch::Model::TransactionalCallbacks::BulkIndexingJob
Background job which handles the request to index/update/delete documents asynchronously
Elasticsearch::Model::TransactionalCallbacks::BulkIndexingJob.perform_later( document_type: { index: [{ _id: document.id }], update: [{ _id: document.id }], delete: [{ _id: document.id }], } )
Public Instance Methods
perform(indexables)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 20 def perform(indexables) indexables.each do |document_type, action_map| klass = document_type.to_s.camelcase.constantize body = transform_batches(klass, action_map) response = bulk_index klass, body ::Rails.logger.error "[ELASTICSEARCH] Bulk request failed: #{response['items']}" if response&.dig('errors') end end
Private Instance Methods
build_reverse_map(action_map)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 46 def build_reverse_map(action_map) action_map.each_with_object({}) { |map, memo| action, options = map next if action == :delete options.each do |option| memo[option[:_id]] = [action, option] end } end
bulk_index(klass, body)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 84 def bulk_index(klass, body) return if body.blank? klass.__elasticsearch__.client.bulk( index: klass.index_name, type: klass.document_type, body: body ) end
preload(resources)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 58 def preload(resources) resources.respond_to?(:preload_for_import) ? resources.preload_for_import : resources end
to_indexed_json(resource)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 78 def to_indexed_json(resource) return resource.as_indexed_json if resource.respond_to?(:as_indexed_json) resource.__elasticsearch__.as_indexed_json end
transform_batches(klass, action_map)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 33 def transform_batches(klass, action_map) reverse_map = build_reverse_map(action_map) resources = klass.where id: reverse_map.keys preload(resources).find_each.map { |resource| action, option = reverse_map[resource.id] send "transform_#{action}", resource, option } + action_map.fetch(:delete, []).map { |option| transform_delete(option) } end
transform_delete(option)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 74 def transform_delete(option) { delete: option } end
transform_index(resource, option)
click to toggle source
# File lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb, line 62 def transform_index(resource, option) { index: option.merge(data: to_indexed_json(resource)) } end
Also aliased as: transform_update
transform_update(resource, option)
elasticsearch do support update operation in their bulk API, but it will fail in case the update is done to missing documents, while index work for both new and existing document
because of this, we choose to use index for update to avoid issue with race condition where a document is updated immediately after it is created, on which elasticsearch might not be aware of the document yet
Alias for: transform_index