class Backfiller::Runner
Attributes
batch_size[R]
connection_pool[R]
cursor_threshold[R]
process_method[R]
task[R]
Public Class Methods
new(task_name)
click to toggle source
# File lib/backfiller/runner.rb, line 12 def initialize(task_name) @task = build_task(task_name) @connection_pool = @task.respond_to?(:connection_pool) ? @task.connection_pool : default_connection_pool @batch_size = @task.respond_to?(:batch_size) ? @task.batch_size : Backfiller.batch_size @cursor_threshold = @task.respond_to?(:cursor_threshold) ? @task.cursor_threshold : Backfiller.cursor_threshold @process_method = @task.respond_to?(:process_row) ? @task.method(:process_row) : method(:process_row) end
Public Instance Methods
run()
click to toggle source
It uses two connections from pool:
-
master [M] - reads data using cursor in transaction
-
worker [W] - changes data based on record red from master
@example
[M] BEGIN [M] DECLARE backfill_cursor SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM users // Start fetch and process loop: [M] FETCH 1000 backfill_cursor [W] UPDATE users SET full_name = '...' where id = 1 [W] ... [W] UPDATE users SET full_name = '...' where id = 1000 [M] FETCH 1000 backfill_cursor [W] UPDATE users SET full_name = '...' where id = 1001 [W] ... [W] UPDATE users SET full_name = '...' where id = 2000 // Records per cursor transaction threshold reached. Reopen transaction. [M] CLOSE backfill_cursor [M] COMMIT [M] BEGIN [M] DECLARE backfill_cursor SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM users [M] FETCH 1000 backfill_cursor // The end of cursor reached. Break cursor loop and exit. [M] CLOSE backfill_cursor [M] COMMIT
# File lib/backfiller/runner.rb, line 45 def run master_connection = acquire_connection worker_connection = acquire_connection run_cursor_loop(master_connection) do |row| process_method.call(worker_connection, row) end release_connection(master_connection) release_connection(worker_connection) end
Private Instance Methods
acquire_connection()
click to toggle source
# File lib/backfiller/runner.rb, line 71 def acquire_connection connection_pool.checkout end
build_cursor(connection)
click to toggle source
Build cursor object that will use master connection.
# File lib/backfiller/runner.rb, line 124 def build_cursor(connection) Backfiller::Cursor.new(connection, 'backfill_cursor', task.select_sql) end
build_task(task_name)
click to toggle source
# File lib/backfiller/runner.rb, line 59 def build_task(task_name) Backfiller.log "Build #{task_name} task" require File.join(Backfiller.task_directory, task_name) "#{Backfiller.task_namespace}/#{task_name}".classify.constantize.new end
default_connection_pool()
click to toggle source
# File lib/backfiller/runner.rb, line 67 def default_connection_pool defined?(ApplicationRecord) ? ApplicationRecord.connection_pool : ActiveRecord::Base.connection_pool end
process_row(connection, row)
click to toggle source
Process row using worker connection.
# File lib/backfiller/runner.rb, line 129 def process_row(connection, row) Array(task.execute_sql(connection, row)).each do |sql| connection.execute(sql) end end
release_connection(connection)
click to toggle source
# File lib/backfiller/runner.rb, line 75 def release_connection(connection) connection_pool.checkin(connection) end
run_cursor_loop(connection, &block)
click to toggle source
Run loop that re-open cursor transaction on threshold
# File lib/backfiller/runner.rb, line 82 def run_cursor_loop(connection, &block) Backfiller.log 'Start cursor loop' total_count = 0 cursor = build_cursor(connection) loop do finished, count = cursor.transaction do run_fetch_loop(cursor, &block) end total_count += count Backfiller.log "Total processed #{total_count}" break if finished end end
run_fetch_loop(cursor, &block)
click to toggle source
@return [Array<Boolean, Integer>] finished_status/processed_count
# File lib/backfiller/runner.rb, line 101 def run_fetch_loop(cursor, &block) Backfiller.log 'Start fetch loop' count = 0 loop do result = cursor.fetch(batch_size) return [true, count] if result.empty? result.each do |row| block.call(row) count += 1 end Backfiller.log "Processed #{count}" return [false, count] if cursor_threshold && count > cursor_threshold end end