class MysqlImport
Constants
- VERSION
Public Class Methods
new(config, opts = {})
click to toggle source
# File lib/mysql_import.rb, line 9 def initialize(config, opts = {}) @stash = [] @lock = opts.fetch(:lock, false) @concurrency = opts.has_key?(:concurrency) ? opts[:concurrency].to_i : 2 pool = @concurrency.zero? ? 1 : @concurrency sql_opts = opts.fetch(:sql_opts, {}) @client = ConnectionPool.new(size: pool) { LoadDataInfile2::Client.new(config, sql_opts) } @result = Result.new end
Public Instance Methods
add(file_path, opts = {})
click to toggle source
# File lib/mysql_import.rb, line 20 def add(file_path, opts = {}) @stash.push([file_path, opts]) end
import(filters = nil)
click to toggle source
# File lib/mysql_import.rb, line 24 def import(filters = nil) Parallel.each(filtered_list(filters), parallel_opts) do |args| @client.with do |cli| begin store[:client] = cli import_internal(*args) ensure clear_store end end end end
Private Instance Methods
clear_store()
click to toggle source
# File lib/mysql_import.rb, line 151 def clear_store Thread.current[:store] = nil end
filtered_list(filters)
click to toggle source
# File lib/mysql_import.rb, line 39 def filtered_list(filters) return @stash if filters.nil? regexps = Array(filters).map{|f| Regexp.new(f) if f.is_a?(String) }.compact return [] if regexps.empty? @stash.map{|row| row if regexps.any?{|r| r.match(row[0]) } }.compact end
import_internal(fpath, opts)
click to toggle source
# File lib/mysql_import.rb, line 52 def import_internal(fpath, opts) sql_opts = opts.reject {|k, _| %i(before after).include?(k) } store[:table] = sql_opts[:table] || File.basename(fpath, '.*') with_recording do with_lock_if_needed(opts.fetch(:lock, @lock)) do with_skip_handling do run_before_action(opts[:before]) store[:client].import(fpath, sql_opts) run_after_action(opts[:after]) end end end end
parallel_opts()
click to toggle source
# File lib/mysql_import.rb, line 48 def parallel_opts { in_threads: @concurrency } end
realtime() { || ... }
click to toggle source
# File lib/mysql_import.rb, line 102 def realtime Benchmark.realtime { yield } end
run_action(action)
click to toggle source
# File lib/mysql_import.rb, line 106 def run_action(action) return unless action case action when Array action.each { |act| run_action(act) } when String store[:client].query(action) else action.call(store[:client]) end end
run_after_action(action)
click to toggle source
# File lib/mysql_import.rb, line 125 def run_after_action(action) run_action(action) rescue Break raise AfterBreak end
run_before_action(action)
click to toggle source
# File lib/mysql_import.rb, line 119 def run_before_action(action) run_action(action) rescue Break raise BeforeBreak end
store()
click to toggle source
# File lib/mysql_import.rb, line 147 def store Thread.current[:store] ||= {} end
unlock()
click to toggle source
# File lib/mysql_import.rb, line 139 def unlock [ 'COMMIT;', 'UNLOCK TABLES;', 'SET autocommit=@old_autocommit;' ].each {|sql| store[:client].query(sql)} end
with_lock_if_needed(need) { || ... }
click to toggle source
# File lib/mysql_import.rb, line 81 def with_lock_if_needed(need) if need begin write_lock store[:lock_time] = realtime { yield } ensure unlock end else yield end end
with_recording() { || ... }
click to toggle source
# File lib/mysql_import.rb, line 69 def with_recording store[:exec_time] = realtime { yield } ensure if store[:before_break] @result.skipped.push(store[:table]) else res = [store[:table], store[:exec_time]] res.push(store[:lock_time]) if store[:lock_time] @result.imported.push(res) end end
with_skip_handling() { || ... }
click to toggle source
# File lib/mysql_import.rb, line 94 def with_skip_handling yield rescue BeforeBreak store[:before_break] = true rescue AfterBreak store[:after_break] = true end
write_lock()
click to toggle source
# File lib/mysql_import.rb, line 131 def write_lock [ 'SET @old_autocommit=@@autocommit;', 'SET autocommit=0;', "LOCK TABLE `#{store[:table]}` WRITE;" ].each {|sql| store[:client].query(sql)} end