class JobIteration::ActiveRecordBatchEnumerator

Builds Batch Enumerator based on ActiveRecord Relation. @see EnumeratorBuilder

Constants

SQL_DATETIME_WITH_NSEC

Public Class Methods

new(relation, columns: nil, batch_size: 100, cursor: nil) click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 11
def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
  @batch_size = batch_size
  @primary_key = "#{relation.table_name}.#{relation.primary_key}"
  @columns = Array(columns&.map(&:to_s) || @primary_key)
  @primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key)
  @pluck_columns = if @primary_key_index
    @columns
  else
    @columns.dup << @primary_key
  end
  @cursor = Array.wrap(cursor)
  @initial_cursor = @cursor
  raise ArgumentError, "Must specify at least one column" if @columns.empty?
  if relation.joins_values.present? && !@columns.all? { |column| column.to_s.include?(".") }
    raise ArgumentError, "You need to specify fully-qualified columns if you join a table"
  end

  if relation.arel.orders.present? || relation.arel.taken.present?
    raise ConditionNotSupportedError
  end

  @base_relation = relation.reorder(@columns.join(","))
end

Public Instance Methods

each() { |relation, cursor_value| ... } click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 35
def each
  return to_enum { size } unless block_given?
  while (relation = next_batch)
    yield relation, cursor_value
  end
end
size() click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 42
def size
  (@base_relation.count + @batch_size - 1) / @batch_size # ceiling division
end

Private Instance Methods

column_value(value) click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 113
def column_value(value)
  value.is_a?(Time) ? value.strftime(SQL_DATETIME_WITH_NSEC) : value
end
conditions() click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 91
def conditions
  column_index = @cursor.size - 1
  column = @columns[column_index]
  where_clause = if @columns.size == @cursor.size
    "#{column} > ?"
  else
    "#{column} >= ?"
  end
  while column_index > 0
    column_index -= 1
    column = @columns[column_index]
    where_clause = "#{column} > ? OR (#{column} = ? AND (#{where_clause}))"
  end
  ret = @cursor.reduce([where_clause]) { |params, value| params << value << value }
  ret.pop
  ret
end
cursor_value() click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 86
def cursor_value
  return @cursor.first if @cursor.size == 1
  @cursor
end
next_batch() click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 48
def next_batch
  relation = @base_relation.limit(@batch_size)
  if conditions.any?
    relation = relation.where(*conditions)
  end

  cursor_values, ids = relation.uncached do
    pluck_columns(relation)
  end

  cursor = cursor_values.last
  unless cursor.present?
    @cursor = @initial_cursor
    return
  end
  # The primary key was plucked, but original cursor did not include it, so we should remove it
  cursor.pop unless @primary_key_index
  @cursor = Array.wrap(cursor)

  # Yields relations by selecting the primary keys of records in the batch.
  # Post.where(published: nil) results in an enumerator of relations like: Post.where(ids: batch_of_ids)
  @base_relation.where(@primary_key => ids)
end
pluck_columns(relation) click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 72
def pluck_columns(relation)
  if @pluck_columns.size == 1 # only the primary key
    column_values = relation.pluck(*@pluck_columns)
    return [column_values, column_values]
  end

  column_values = relation.pluck(*@pluck_columns)
  primary_key_index = @primary_key_index || -1
  primary_key_values = column_values.map { |values| values[primary_key_index] }

  serialize_column_values!(column_values)
  [column_values, primary_key_values]
end
serialize_column_values!(column_values) click to toggle source
# File lib/job-iteration/active_record_batch_enumerator.rb, line 109
def serialize_column_values!(column_values)
  column_values.map! { |values| values.map! { |value| column_value(value) } }
end