class S3DataPacker::Packer
Attributes
output[R]
source[R]
target[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/s3_data_packer/packer.rb, line 9 def initialize opts = {} @source = opts[:source] @target = opts[:target] @output = opts[:output] || S3DataPacker::JSONBatch.new end
Public Instance Methods
logger()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 19 def logger @logger ||= S3DataPacker.logger end
pack!()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 27 def pack! log "Packing data from #{source.name} to #{target.name} ..." boot_workers! @start_time = Time.now begin each_item { |item| workers.queue.add!(item) } finalize_processing! rescue Exception => e log "Unexpected error, killing threads", :error raise e ensure workers.kill! end end
process_item(data)
click to toggle source
# File lib/s3_data_packer/packer.rb, line 43 def process_item(data) output.append_data! data summary.count_processed if output.full? flush_batch! output.new_file! end end
summary()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 15 def summary @summary ||= S3DataPacker::Summary.new end
workers()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 23 def workers @workers ||= S3DataPacker::ThreadSet.new end
Private Instance Methods
boot_workers!()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 86 def boot_workers! output.new_file! workers.spawn_threads! do |item| data = source.fetch(item) workers.lock.synchronize { process_item(data) } post_process_item(item) end end
each_item() { |item| ... }
click to toggle source
# File lib/s3_data_packer/packer.rb, line 65 def each_item &block source.each do |item| if workers.dead? log "Workers diead", :error raise Error::DeadWorkers, 'Workers died' end summary.count_item yield item end end
finalize_processing!()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 54 def finalize_processing! log "No more items found to enqueue, signaling workers to finish" workers.finish! workers.wait! workers.kill! log "Pushing last open batch #{output.path}" flush_batch! summary.set_time(@start_time, Time.now) log "Finished\n#{summary.flush!}" end
flush_batch!()
click to toggle source
# File lib/s3_data_packer/packer.rb, line 76 def flush_batch! summary.count_batch final_filename = output.finalize! send_file!(final_filename) end
log(message, level = :info)
click to toggle source
# File lib/s3_data_packer/packer.rb, line 100 def log(message, level = :info) logger.send level, "Main process: #{message}" end
post_process_item(item)
click to toggle source
# File lib/s3_data_packer/packer.rb, line 95 def post_process_item(item) # Do something with the key after processed nil end
send_file!(file)
click to toggle source
# File lib/s3_data_packer/packer.rb, line 82 def send_file!(file) target.save_file file end