class MysqlImport

Constants

VERSION

Public Class Methods

new(config, opts = {}) click to toggle source
# File lib/mysql_import.rb, line 9
def initialize(config, opts = {})
  @stash = []
  @lock = opts.fetch(:lock, false)
  @concurrency = opts.has_key?(:concurrency) ? opts[:concurrency].to_i : 2
  pool = @concurrency.zero? ? 1 : @concurrency
  sql_opts = opts.fetch(:sql_opts, {})

  @client = ConnectionPool.new(size: pool) { LoadDataInfile2::Client.new(config, sql_opts) }
  @result = Result.new
end

Public Instance Methods

add(file_path, opts = {}) click to toggle source
# File lib/mysql_import.rb, line 20
def add(file_path, opts = {})
  @stash.push([file_path, opts])
end
import(filters = nil) click to toggle source
# File lib/mysql_import.rb, line 24
def import(filters = nil)
  Parallel.each(filtered_list(filters), parallel_opts) do |args|
    @client.with do |cli|
      begin
        store[:client] = cli
        import_internal(*args)
      ensure
        clear_store
      end
    end
  end
end

Private Instance Methods

clear_store() click to toggle source
# File lib/mysql_import.rb, line 151
def clear_store
  Thread.current[:store] = nil
end
filtered_list(filters) click to toggle source
# File lib/mysql_import.rb, line 39
def filtered_list(filters)
  return @stash if filters.nil?

  regexps = Array(filters).map{|f| Regexp.new(f) if f.is_a?(String) }.compact
  return [] if regexps.empty?

  @stash.map{|row| row if regexps.any?{|r| r.match(row[0]) } }.compact
end
import_internal(fpath, opts) click to toggle source
# File lib/mysql_import.rb, line 52
def import_internal(fpath, opts)
  sql_opts = opts.reject {|k, _| %i(before after).include?(k) }
  store[:table] = sql_opts[:table] || File.basename(fpath, '.*')

  with_recording do
    with_lock_if_needed(opts.fetch(:lock, @lock)) do
      with_skip_handling do
        run_before_action(opts[:before])

        store[:client].import(fpath, sql_opts)

        run_after_action(opts[:after])
      end
    end
  end
end
parallel_opts() click to toggle source
# File lib/mysql_import.rb, line 48
def parallel_opts
  { in_threads: @concurrency }
end
realtime() { || ... } click to toggle source
# File lib/mysql_import.rb, line 102
def realtime
  Benchmark.realtime { yield }
end
run_action(action) click to toggle source
# File lib/mysql_import.rb, line 106
def run_action(action)
  return unless action

  case action
  when Array
    action.each { |act| run_action(act) }
  when String
    store[:client].query(action)
  else
    action.call(store[:client])
  end
end
run_after_action(action) click to toggle source
# File lib/mysql_import.rb, line 125
def run_after_action(action)
  run_action(action)
rescue Break
  raise AfterBreak
end
run_before_action(action) click to toggle source
# File lib/mysql_import.rb, line 119
def run_before_action(action)
  run_action(action)
rescue Break
  raise BeforeBreak
end
store() click to toggle source
# File lib/mysql_import.rb, line 147
def store
  Thread.current[:store] ||= {}
end
unlock() click to toggle source
# File lib/mysql_import.rb, line 139
def unlock
  [
    'COMMIT;',
    'UNLOCK TABLES;',
    'SET autocommit=@old_autocommit;'
  ].each {|sql| store[:client].query(sql)}
end
with_lock_if_needed(need) { || ... } click to toggle source
# File lib/mysql_import.rb, line 81
def with_lock_if_needed(need)
  if need
    begin
      write_lock
      store[:lock_time] = realtime { yield }
    ensure
      unlock
    end
  else
    yield
  end
end
with_recording() { || ... } click to toggle source
# File lib/mysql_import.rb, line 69
def with_recording
  store[:exec_time] = realtime { yield }
ensure
  if store[:before_break]
    @result.skipped.push(store[:table])
  else
    res = [store[:table], store[:exec_time]]
    res.push(store[:lock_time]) if store[:lock_time]
    @result.imported.push(res)
  end
end
with_skip_handling() { || ... } click to toggle source
# File lib/mysql_import.rb, line 94
def with_skip_handling
  yield
rescue BeforeBreak
  store[:before_break] = true
rescue AfterBreak
  store[:after_break] = true
end
write_lock() click to toggle source
# File lib/mysql_import.rb, line 131
def write_lock
  [
    'SET @old_autocommit=@@autocommit;',
    'SET autocommit=0;',
    "LOCK TABLE `#{store[:table]}` WRITE;"
  ].each {|sql| store[:client].query(sql)}
end