class Dataflow::Adapters::MongoDbAdapter

Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.

Constants

SYSTEM_ID

Attributes

client[R]
settings[R]

Public Class Methods

admin_client(settings) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 19
def admin_client(settings)
  return @admin_client if @admin_client
  @admin_client = client(settings, db_name: 'admin')
end
client(settings, db_name: nil) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 10
def client(settings, db_name: nil)
  @clients ||= {}

  settings.adapter_type = 'mongodb'
  connection_uri = settings.connection_uri_or_default
  db_name ||= settings.db_name
  @clients["#{connection_uri}.#{db_name}"] ||= Mongo::Client.new([connection_uri], database: db_name)
end
disconnect_clients() click to toggle source

Force the clients to disconnect their connections. Use before forking.

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 26
def disconnect_clients
  @clients ||= {}
  @clients.values.each(&:close)
  @clients = {}
end
new(args) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 36
def initialize(args)
  update_settings(args)
  @client = MongoDbAdapter.client(settings)
  @admin_client = MongoDbAdapter.admin_client(settings)
end

Public Instance Methods

all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) { |res| ... } click to toggle source

retrieve all elements from a data node

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 52
def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false)
  projection = fields.map { |field| [field, 1] }

  unless include_system_id || fields.map(&:to_s).include?(SYSTEM_ID)
    # by default, do not select the _id field
    projection << [SYSTEM_ID, 0].freeze
  end

  opts = transform_to_query(where)
  res = client[read_dataset_name].find(opts)
  res = res.projection(projection.to_h)

  res = res.sort(sort)   if sort
  res = res.skip(offset) if offset > 0
  res = res.limit(limit) if limit > 0

  if block_given?
    yield res
  else
    res.to_a
  end
end
all_paginated(where: {}, fields: [], cursor: nil) click to toggle source

Helper that supports paginating through the whole dataset at fixed performance. Unlike using offset/skip which requires to read through the skipped content (high usage of CPU), we use the internal mongo cursor to get batch of results. @return [Hash] with 2 fields: data and next_cursor for the next call

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 80
def all_paginated(where: {}, fields: [], cursor: nil)
  cursor = cursor.to_i
  data = []

  # If there is no cursor, we make the initial query
  # get the first batch of data and get the cursor id.
  if cursor.zero?
    all(where: where, fields: fields) do |res|
      results = res.initial_query
      data = results.documents
      cursor = res.cursor.id
    end
  end

  # The first query's result batch is a small 101 set of results
  # so we want to get one more batch of data.
  # However, there might be queries whose results are very small
  # and the resulting cursor is 0. In such case there is no more
  # data to be fetched.
  unless cursor.zero?
    # send a getMore command on the cursor id
    command = { getMore: cursor, collection: read_dataset_name }
    result = client.database.command(command).documents[0]
    cursor = result['cursor']['id']
    data += result['cursor']['nextBatch']
  end

  # We want to return the cursor as a string.
  # If there is no cursor (zero) then make it empty
  cursor = '' if cursor.zero?

  { 'data' => data, 'next_cursor' => cursor.to_s }
rescue Mongo::Error::OperationFailure
  { 'data' => data, 'next_cursor' => '' }
end
count(where: {}) click to toggle source

count the number of records

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 134
def count(where: {})
  client[read_dataset_name].count(transform_to_query(where))
end
create_indexes(dataset: nil, type: :all, drop_retry_on_error: true) click to toggle source

Create the indexes on this dataset. @param dataset [String] Specify on which dataset the operation will be performed.

Default: the adatpter's settings' dataset.

@param type [Symbol] select which indexes type to create.

Can be :all (default), :unique_only, :non_unique_only
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 191
def create_indexes(dataset: nil, type: :all, drop_retry_on_error: true)
  dataset ||= write_dataset_name
  return unless settings.indexes.present?

  indexes = (settings.indexes || [])

  case type
  when :unique_only
    indexes = indexes.select { |idx| idx['unique'] }
  when :non_unique_only
    indexes = indexes.reject { |idx| idx['unique'] }
  end

  indexes = indexes.map { |x| format_index(x) }
  client[dataset].indexes.create_many(indexes)
rescue Mongo::Error::OperationFailure => e
  raise e unless drop_retry_on_error
  client[dataset].indexes.drop_all
  create_indexes(drop_retry_on_error: false)
end
delete(where: {}) click to toggle source

Delete records that match the options. @param where query to apply on the delete operation.

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 169
def delete(where: {})
  client[read_dataset_name].delete_many(transform_to_query(where))
end
drop_dataset(dataset) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 181
def drop_dataset(dataset)
  collection = client[dataset]
  collection.drop
end
dump(base_folder:, read_dataset_idx:) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 226
def dump(base_folder:, read_dataset_idx:)
  archive_path = "#{base_folder}/#{@settings.db_name}/#{@settings.dataset_name}.#{read_dataset_idx}.gz"
  options = "--archive=#{archive_path} --db=#{@settings.db_name} --collection=#{read_dataset_name} "
  options += "--host=#{@settings.db_host} " if @settings.db_host.present?
  options += "--port=#{@settings.db_port} " if @settings.db_port.present?
  options += "--username=#{@settings.db_user} " if @settings.db_user.present?
  options += "--password=#{@settings.db_password} " if @settings.db_password.present?

  `mkdir -p #{base_folder}/#{@settings.db_name}`
  `mongodump #{options} --gzip`
  archive_path
end
find(where: {}, fields: [], sort: {}, offset: 0) click to toggle source

retrieve a single element from a data node

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 47
def find(where: {}, fields: [], sort: {}, offset: 0)
  all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first
end
ordered_system_id_queries(batch_size:, where: {}) click to toggle source

Create queries that permit processing the whole dataset in parallel without using offsets.

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 117
def ordered_system_id_queries(batch_size:, where: {})
  ids = all(fields: [SYSTEM_ID], where: where, sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID].to_s }
  queries_count = (ids.size / batch_size.to_f).ceil
  Array.new(queries_count) do |i|
    from = ids[i * batch_size]
    to = ids[(i + 1) * batch_size] || ids[-1]
    is_last = i == queries_count - 1

    where_query = { SYSTEM_ID => { '>=' => from } }
    operator = is_last ? '<=' : '<'
    where_query[SYSTEM_ID][operator] = to

    where_query
  end
end
recreate_dataset(dataset: nil) click to toggle source

recreate the table/collection

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 174
def recreate_dataset(dataset: nil)
  dataset ||= write_dataset_name
  drop_dataset(dataset)
  collection = client[dataset]
  collection.create
end
restore(filepath:, dataset_name:) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 239
def restore(filepath:, dataset_name:)
  options = "--archive=#{filepath} --db=#{@settings.db_name} --collection=#{dataset_name} "
  options += "--host=#{@settings.db_host} " if @settings.db_host.present?
  options += "--port=#{@settings.db_port} " if @settings.db_port.present?
  options += "--username=#{@settings.db_user} " if @settings.db_user.present?
  options += "--password=#{@settings.db_password} " if @settings.db_password.present?

  `mongorestore #{options} --drop --gzip`
end
retrieve_dataset_indexes(collection) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 289
def retrieve_dataset_indexes(collection)
  mongo_indexes = client[collection].indexes
  mongo_indexes.map do |idx|
    # skip the default index
    next if idx['key'].keys == ['_id']

    index = { 'key' => idx['key'].keys }
    index['unique'] = true if idx['unique']
    index
  end.compact
rescue Mongo::Error::OperationFailure
  []
end
save(records:, replace_by: nil) click to toggle source

Save the given records. @param replace_by [Array] if the replace_by key is provided,

it will try to replace records with the matching key,
or insert if none is found.
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 142
def save(records:, replace_by: nil)
  if replace_by.present?
    replace_keys = Array(replace_by)
    bulk_ops = records.map do |record|
      filter = replace_keys.map { |x| [x, record[x]] }.to_h
      {
        replace_one: {
          filter: filter,
          replacement: record,
          upsert: true
        }
      }
    end
    client[write_dataset_name].bulk_write(bulk_ops, ordered: false)
  else
    client[write_dataset_name].insert_many(records, ordered: false)
  end
rescue Mongo::Error::BulkWriteError => e
  dup_key_error = e.result['writeErrors'].all? { |x| x['code'] == 11_000 }
  # don't raise if it is errors about duplicated keys
  unless dup_key_error
    raise e
  end
end
transform_to_query(opts) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 249
def transform_to_query(opts)
  sanitized_opts = {}
  opts.each do |k, v|
    if v.is_a? Array
      # e.g. { 'id' => [1,2] } transform to mongodb IN clauses
      sanitized_opts[k] = { '$in' => v.map { |value| try_cast_value(k, value) } }
    elsif v.is_a? Hash
      sanitized_opts[k] = {}
      v.each do |operator, value|
        case operator.to_s
        when '!='
          # we still need to check and transform into
          if value.is_a? Array
            # { '$nin' => [value] }
            sanitized_opts[k]['$nin'] = value.map { |x| try_cast_value(k, x) }
          else
            # or {'$ne' => value }
            sanitized_opts[k]['$ne'] = try_cast_value(k, value)
          end
        when '<'
          sanitized_opts[k]['$lt'] = try_cast_value(k, value)
        when '<='
          sanitized_opts[k]['$lte'] = try_cast_value(k, value)
        when '>'
          sanitized_opts[k]['$gt'] = try_cast_value(k, value)
        when '>='
          sanitized_opts[k]['$gte'] = try_cast_value(k, value)
        when '~*' # match regex /regex/i (case insensitive)
          sanitized_opts[k]['$regex'] = /#{value}/i
        when '~'  # match regex /regex/  (case sensitive)
          sanitized_opts[k]['$regex'] = /#{value}/
        end
      end
    else
      sanitized_opts[k] = try_cast_value(k, v)
    end
  end
  sanitized_opts
end
update_settings(args) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 42
def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
end
usage(dataset:) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 212
def usage(dataset:)
  command = { collstats: dataset }
  result = client.database.command(command).documents[0]
  {
    memory: result['size'],
    storage: result['storageSize'],
  }
rescue Mongo::Error::OperationFailure, Mongo::Error::InvalidCollectionName
  {
    memory: 0,
    storage: 0,
  }
end

Private Instance Methods

format_index(dataset_index) click to toggle source

Required index format for mongodb: { :key => { name: 1 }, :unique => true },

# File lib/dataflow/adapters/mongo_db_adapter.rb, line 325
def format_index(dataset_index)
  dataset_index = dataset_index.with_indifferent_access

  index_key = {}
  keys = Array(dataset_index[:key])
  keys.each { |k| index_key[k] = 1 }
  name = keys.map { |k| k[0..1] }.push(SecureRandom.hex(4)).join('_')
  index = { key: index_key, name: name }
  index[:unique] = true if dataset_index[:unique]
  index
end
read_dataset_name() click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 309
def read_dataset_name
  settings.read_dataset_name
end
try_cast_value(field, value) click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 313
def try_cast_value(field, value)
  # cast to time when querying on _mojaco_updated_at
  return Timeliness.parse(value) || value if field =~ /_mojaco_updated_at/
  # cast to ObjectId when querying on _id
  return BSON::ObjectId(value) if field == SYSTEM_ID && value.is_a?(String)

  # TODO: add other casts based on the field type
  value
end
write_dataset_name() click to toggle source
# File lib/dataflow/adapters/mongo_db_adapter.rb, line 305
def write_dataset_name
  settings.write_dataset_name
end