class BulkInsert::Worker
Attributes
adapter_name[RW]
after_save_callback[RW]
before_save_callback[RW]
connection[R]
ignore[R]
result_sets[R]
set_size[RW]
update_duplicates[R]
Public Class Methods
new(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
click to toggle source
# File lib/bulk_insert/worker.rb, line 12 def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false) @statement_adapter = StatementAdapters.adapter_for(connection) @connection = connection @set_size = set_size @adapter_name = connection.adapter_name # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts @ignore = ignore @update_duplicates = update_duplicates @return_primary_keys = return_primary_keys columns = connection.columns(table_name) column_map = columns.inject({}) { |h, c| h.update(c.name => c) } @primary_key = primary_key @columns = column_names.map { |name| column_map[name.to_s] } @table_name = connection.quote_table_name(table_name) @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",") @before_save_callback = nil @after_save_callback = nil @result_sets = [] @set = [] end
Public Instance Methods
add(values)
click to toggle source
# File lib/bulk_insert/worker.rb, line 47 def add(values) save! if @set.length >= set_size values = values.with_indifferent_access if values.is_a?(Hash) mapped = @columns.map.with_index do |column, index| value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length) if !value_exists if column.default.present? column.default elsif column.name == "created_at" || column.name == "updated_at" :__timestamp_placeholder else nil end else values.is_a?(Hash) ? values[column.name] : values[index] end end @set.push(mapped) self end
add_all(rows)
click to toggle source
# File lib/bulk_insert/worker.rb, line 70 def add_all(rows) rows.each { |row| add(row) } self end
after_save(&block)
click to toggle source
# File lib/bulk_insert/worker.rb, line 79 def after_save(&block) @after_save_callback = block end
before_save(&block)
click to toggle source
# File lib/bulk_insert/worker.rb, line 75 def before_save(&block) @before_save_callback = block end
compose_insert_query()
click to toggle source
# File lib/bulk_insert/worker.rb, line 112 def compose_insert_query sql = insert_sql_statement @now = Time.now rows = [] @set.each do |row| values = [] @columns.zip(row) do |column, value| value = @now if value == :__timestamp_placeholder if ActiveRecord::VERSION::STRING >= "5.0.0" if column type = @connection.lookup_cast_type_from_column(column) value = type.serialize(value) end values << @connection.quote(value) else values << @connection.quote(value, column) end end rows << "(#{values.join(',')})" end if !rows.empty? sql << rows.join(",") sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates) sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys sql else false end end
execute_query()
click to toggle source
# File lib/bulk_insert/worker.rb, line 94 def execute_query if query = compose_insert_query # Return primary key support broke mysql compatibility # with rails < 5 mysql adapter. (see issue #41) if ActiveRecord::VERSION::STRING < "5.0.0" && @statement_adapter.is_a?(StatementAdapters::MySQLAdapter) # raise an exception for unsupported return_primary_keys raise ArgumentError.new("BulkInsert does not support @return_primary_keys for mysql and rails < 5") if @return_primary_keys # restore v1.6 query execution @connection.execute(query) else result_set = @connection.exec_query(query) @result_sets.push(result_set) if @return_primary_keys end end end
insert_sql_statement()
click to toggle source
# File lib/bulk_insert/worker.rb, line 145 def insert_sql_statement insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : '' "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES " end
pending?()
click to toggle source
# File lib/bulk_insert/worker.rb, line 39 def pending? @set.any? end
pending_count()
click to toggle source
# File lib/bulk_insert/worker.rb, line 43 def pending_count @set.count end
save!()
click to toggle source
# File lib/bulk_insert/worker.rb, line 83 def save! if pending? @before_save_callback.(@set) if @before_save_callback execute_query @after_save_callback.() if @after_save_callback @set.clear end self end