class Spawnling

Constants

RAILS_1_x
RAILS_2_2
RAILS_3_x
VERSION

Attributes

handle[RW]
type[RW]

Public Class Methods

alive?(pid) click to toggle source
# File lib/spawnling.rb, line 72
def self.alive?(pid)
  begin
    Process::kill 0, pid
    # if the process is alive then kill won't throw an exception
    true
  rescue Errno::ESRCH
    false
  end
end
allow_concurrency?() click to toggle source
# File lib/spawnling.rb, line 126
def self.allow_concurrency?
  return true if RAILS_2_2
  if defined?(ActiveRecord) && ActiveRecord::Base.respond_to?(:allow_concurrency)
    ActiveRecord::Base.allow_concurrency
  elsif defined?(Rails) && Rails.application
    Rails.application.config.allow_concurrency
  else
    true # assume user knows what they are doing
  end
end
close_resources() click to toggle source

close all the resources added by calls to resource_to_close

# File lib/spawnling.rb, line 64
def self.close_resources
  @@resources.each do |resource|
    resource.close if resource && resource.respond_to?(:close) && !resource.closed?
  end
  # in case somebody spawns recursively
  @@resources.clear
end
default_options(options = {}) click to toggle source

Set the options to use every time spawn is called unless specified otherwise. For example, in your environment, do something like this:

Spawnling::default_options = {:nice => 5}

to default to using the :nice option with a value of 5 on every call. Valid options are:

:method => (:thread | :fork | :yield)
:nice   => nice value of the forked process
:kill   => whether or not the parent process will kill the
           spawned child process when the parent exits
:argv   => changes name of the spawned process as seen in ps
:detach => whether or not Process.detach is called for spawned child
           processes.  You must wait for children on your own if you
           set this to false
# File lib/spawnling.rb, line 53
def self.default_options(options = {})
  @@default_options.merge!(options)
  @@logger.info "spawn> default options = #{options.inspect}" if @@logger
end
kill_punks() click to toggle source
# File lib/spawnling.rb, line 82
def self.kill_punks
  @@punks.each do |punk|
    if alive?(punk)
      @@logger.info "spawn> parent(#{Process.pid}) killing child(#{punk})" if @@logger
      begin
        Process.kill("TERM", punk)
      rescue
      end
    end
  end
  @@punks = []
end
logger=(logger) click to toggle source
# File lib/spawnling.rb, line 32
def self.logger=(logger)
  @@logger = logger
end
new(opts = {}, &block) click to toggle source

Spawns a long-running section of code and returns the ID of the spawned process. By default the process will be a forked process. To use threading, pass :method => :thread or override the default behavior in the environment by setting 'Spawnling::method :thread'.

# File lib/spawnling.rb, line 101
def initialize(opts = {}, &block)
  @type, @handle = self.class.run(opts, &block)
end
resources_to_close(*resources) click to toggle source

set the resources to disconnect from in the child process (when forking)

# File lib/spawnling.rb, line 59
def self.resources_to_close(*resources)
  @@resources = resources
end
run(opts = {}) { || ... } click to toggle source
# File lib/spawnling.rb, line 105
def self.run(opts = {}, &block)
  raise "Must give block of code to be spawned" unless block_given?
  options = @@default_options.merge(symbolize_options(opts))
  # setting options[:method] will override configured value in default_options[:method]
  if options[:method] == :yield
    yield
  elsif options[:method].respond_to?(:call)
    options[:method].call(proc { yield })
  elsif options[:method] == :thread
    # for versions before 2.2, check for allow_concurrency
   if allow_concurrency?
     return :thread, thread_it(options) { yield }
    else
      @@logger.error("spawn(:method=>:thread) only allowed when allow_concurrency=true")
      raise "spawn requires config.active_record.allow_concurrency=true when used with :method=>:thread"
    end
  else
    return :fork, fork_it(options) { yield }
  end
end
wait(sids = []) click to toggle source
# File lib/spawnling.rb, line 137
def self.wait(sids = [])
  # wait for all threads and/or forks (if a single sid passed in, convert to array first)
  Array(sids).each do |sid|
    if sid.type == :thread
      sid.handle.join()
    else
      begin
        Process.wait(sid.handle)
      rescue
        # if the process is already done, ignore the error
      end
    end
  end
  # clean up connections from expired threads
  clean_connections
end

Protected Class Methods

clean_connections() click to toggle source
# File lib/spawnling.rb, line 236
def self.clean_connections
  return unless defined? ActiveRecord
  ActiveRecord::Base.verify_active_connections! if ActiveRecord::Base.respond_to?(:verify_active_connections!)
  ActiveRecord::Base.clear_active_connections! if ActiveRecord::Base.respond_to?(:clear_active_connections!)
end
fork_it(options) { || ... } click to toggle source
# File lib/spawnling.rb, line 156
def self.fork_it(options)
  # The problem with rails is that it only has one connection (per class),
  # so when we fork a new process, we need to reconnect.
  @@logger.debug "spawn> parent PID = #{Process.pid}" if @@logger
  child = fork do
    begin
      start = Time.now
      @@logger.debug "spawn> child PID = #{Process.pid}" if @@logger

      # this child has no children of it's own to kill (yet)
      @@punks = []

      # set the nice priority if needed
      Process.setpriority(Process::PRIO_PROCESS, 0, options[:nice]) if options[:nice]

      # disconnect from the listening socket, et al
      Spawnling.close_resources
      if defined?(Rails)
        # get a new database connection so the parent can keep the original one
        ActiveRecord::Base.spawn_reconnect if defined?(ActiveRecord)
        # close the memcache connection so the parent can keep the original one
        Rails.cache.reset if Rails.cache.respond_to?(:reset)
      end

      # set the process name
      $0 = options[:argv] if options[:argv]

      # run the block of code that takes so long
      yield

    rescue => ex
      @@logger.error "spawn> Exception in child[#{Process.pid}] - #{ex.class}: #{ex.message}" if @@logger
      @@logger.error "spawn> " + ex.backtrace.join("\n") if @@logger
    ensure
      begin
        # to be safe, catch errors on closing the connnections too
        ActiveRecord::Base.connection_handler.clear_all_connections! if defined?(ActiveRecord)
      ensure
        @@logger.info "spawn> child[#{Process.pid}] took #{Time.now - start} sec" if @@logger
        # ensure log is flushed since we are using exit!
        @@logger.flush if @@logger && @@logger.respond_to?(:flush)
        # this child might also have children to kill if it called spawn
        Spawnling.kill_punks
        # this form of exit doesn't call at_exit handlers
        exit!(0)
      end
    end
  end

  # detach from child process (parent may still wait for detached process if they wish)
  Process.detach(child) if options[:detach]

  # remove dead children from the target list to avoid memory leaks
  @@punks.delete_if {|punk| !Spawn.alive?(punk)}

  # mark this child for death when this process dies
  if options[:kill]
    @@punks << child
    @@logger.debug "spawn> death row = #{@@punks.inspect}" if @@logger
  end

  # return Spawnling::Id.new(:fork, child)
  return child
end
symbolize_options(hash) click to toggle source

In case we don't have rails, can't call opts.symbolize_keys

# File lib/spawnling.rb, line 243
def self.symbolize_options(hash)
  hash.inject({}) do |new_hash, (key, value)|
    new_hash[key.to_sym] = value
    new_hash
  end
end
thread_it(options) { || ... } click to toggle source
# File lib/spawnling.rb, line 221
def self.thread_it(options)
  # clean up stale connections from previous threads
  clean_connections
  thr = Thread.new do
    # run the long-running code block
    if defined?(ActiveRecord)
      ActiveRecord::Base.connection_pool.with_connection { yield }
    else
      yield
    end
  end
  thr.priority = -options[:nice] if options[:nice]
  return thr
end