class S3DataPacker::Packer

Attributes

output[R]
source[R]
target[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/s3_data_packer/packer.rb, line 9
def initialize opts = {}
  @source = opts[:source]
  @target = opts[:target]
  @output = opts[:output] || S3DataPacker::JSONBatch.new
end

Public Instance Methods

logger() click to toggle source
# File lib/s3_data_packer/packer.rb, line 19
def logger
  @logger ||= S3DataPacker.logger
end
pack!() click to toggle source
# File lib/s3_data_packer/packer.rb, line 27
def pack!
  log "Packing data from #{source.name} to #{target.name} ..."
  boot_workers!

  @start_time = Time.now
  begin
    each_item { |item| workers.queue.add!(item) }
    finalize_processing!
  rescue Exception => e
    log "Unexpected error, killing threads", :error
    raise e
  ensure
    workers.kill!
  end
end
process_item(data) click to toggle source
# File lib/s3_data_packer/packer.rb, line 43
def process_item(data)
  output.append_data! data
  summary.count_processed
  if output.full?
    flush_batch!
    output.new_file!
  end
end
summary() click to toggle source
# File lib/s3_data_packer/packer.rb, line 15
def summary
  @summary ||= S3DataPacker::Summary.new
end
workers() click to toggle source
# File lib/s3_data_packer/packer.rb, line 23
def workers
  @workers ||= S3DataPacker::ThreadSet.new
end

Private Instance Methods

boot_workers!() click to toggle source
# File lib/s3_data_packer/packer.rb, line 86
def boot_workers!
  output.new_file!
  workers.spawn_threads! do |item|
    data = source.fetch(item)
    workers.lock.synchronize { process_item(data) }
    post_process_item(item)
  end
end
each_item() { |item| ... } click to toggle source
# File lib/s3_data_packer/packer.rb, line 65
def each_item &block
  source.each do |item|
    if workers.dead?
      log "Workers diead", :error
      raise Error::DeadWorkers, 'Workers died'
    end
    summary.count_item
    yield item
  end
end
finalize_processing!() click to toggle source
# File lib/s3_data_packer/packer.rb, line 54
def finalize_processing!
  log "No more items found to enqueue, signaling workers to finish"
  workers.finish!
  workers.wait!
  workers.kill!
  log "Pushing last open batch #{output.path}"
  flush_batch!
  summary.set_time(@start_time, Time.now)
  log "Finished\n#{summary.flush!}"
end
flush_batch!() click to toggle source
# File lib/s3_data_packer/packer.rb, line 76
def flush_batch!
  summary.count_batch
  final_filename = output.finalize!
  send_file!(final_filename)
end
log(message, level = :info) click to toggle source
# File lib/s3_data_packer/packer.rb, line 100
def log(message, level = :info)
  logger.send level, "Main process: #{message}"
end
post_process_item(item) click to toggle source
# File lib/s3_data_packer/packer.rb, line 95
def post_process_item(item)
  # Do something with the key after processed
  nil
end
send_file!(file) click to toggle source
# File lib/s3_data_packer/packer.rb, line 82
def send_file!(file)
  target.save_file file
end