class Actn::Jobs::Worker

Constants

CHANNEL

Public Class Methods

start() click to toggle source
# File lib/actn/jobs/worker.rb, line 17
def self.start

  # pg = Actn::DB.pg
  pg =  PG::EM::Client.new(Actn::DB.db_config.dup.tap{|s| s.delete(:size) })
  
  EM.run do
    
    notify_proc = Proc.new { |notify| 
      
      raise JobError.new("Payload Missing") unless payload = (Oj.load(notify[:extra]) rescue nil)
        
      raise JobError.new("Job not found") unless job = Job.find(payload['uuid'])
        
      hook = job.hook
        
      raise JobError.new("Hook name missing") unless hook['name']
          
      raise JobError.new("Hook class missing") unless hook_class = (Object.const_get(hook['name']) rescue nil)
        
      if (run_at = ((Time.parse(hook['run_at']) rescue nil) || eval(hook['run_at']))  ) > Time.now
          
        EM.add_timer(run_at - Time.now, proc{ notify_proc.call(notify) })
          
      else              
  
        hook_class.new(job).test_and_perform
          
      end  
        

    }
    
    error_proc = Proc.new { |ex|
      Jobs.logger.error ex
    }
    
    wait_proc = Proc.new { |notify|
      
      notify_proc.call(notify) if notify
    
      pg.wait_for_notify_defer.callback(&wait_proc).errback(&error_proc)
      
    }
  
    pg.wait_for_notify_defer.callback(&wait_proc).errback(&error_proc)
  
    pg.query_defer("LISTEN #{CHANNEL}")
  
  end


end