class Synapse::ProcessManager::ProcessManager
Represents a mechanism for managing the lifeycle and notification of process instances @abstract
Attributes
@return [ProcessFactory]
@return [LockManager]
@return [ProcessRepository]
@return [Boolean] Returns true if exceptions will be logged silently
Public Class Methods
@param [ProcessRepository] repository @param [ProcessFactory] factory @param [LockManager] lock_manager
@param [Class…] process_types @return [undefined]
# File lib/synapse/process_manager/process_manager.rb, line 25 def initialize(repository, factory, lock_manager, *process_types) @repository = repository @factory = factory @lock_manager = lock_manager @process_types = process_types.flatten @logger = Logging.logger[self.class] @suppress_exceptions = true end
Public Instance Methods
@param [EventMessage] event @return [undefined]
# File lib/synapse/process_manager/process_manager.rb, line 37 def notify(event) @process_types.each do |process_type| correlation = extract_correlation process_type, event if correlation current = notify_current_processes process_type, event, correlation if should_start_new_process process_type, event, current start_new_process process_type, event, correlation end end end end
Protected Instance Methods
@abstract @param [Class] process_type @param [EventMessage] event @return [Symbol]
# File lib/synapse/process_manager/process_manager.rb, line 55 def creation_policy_for(process_type, event) raise NotImplementedError end
@abstract @param [Class] process_type @param [EventMessage] event @return [Correlation] Returns nil if no correlation could be extracted
# File lib/synapse/process_manager/process_manager.rb, line 63 def extract_correlation(process_type, event) raise NotImplementedError end
Loads and notifies the process with the given identifier of the given event
@param [String] process_id @param [EventMessage] event @param [Correlation] correlation @return [Process]
# File lib/synapse/process_manager/process_manager.rb, line 117 def notify_current_process(process_id, event, correlation) process = @repository.load process_id unless process && process.active && process.correlations.include?(correlation) # Process has changed or was deleted between the time of the selection query and the # actual loading and locking of the process return end begin notify_process process, event ensure @repository.commit process end process end
Notifies existing processes of the given type and correlation of the given event
@param [Class] process_type @param [EventMessage] event @param [Correlation] correlation @return [Boolean] Returns true if any current processes were found and notified
# File lib/synapse/process_manager/process_manager.rb, line 92 def notify_current_processes(process_type, event, correlation) processes = @repository.find process_type, correlation process_invoked = false processes.each do |process_id| @lock_manager.obtain_lock process_id begin loaded_process = notify_current_process process_id, event, correlation if loaded_process process_invoked = true end ensure @lock_manager.release_lock process_id end end process_invoked end
Notifies the given process with of the given event
@raise [Exception] If an error occurs while notifying the process and exception
suppression is disabled
@param [Process] process @param [EventMessage] event @return [undefined]
# File lib/synapse/process_manager/process_manager.rb, line 168 def notify_process(process, event) begin process.handle event rescue => exception if @suppress_exceptions backtrace = exception.backtrace.join $/ @logger.error 'Exception occured while invoking process [%s] [%s] with [%s]: %s %s' % [process.class, process.id, event.payload_type, exception.inspect, backtrace] else raise end end end
Determines whether or not a new process should be started, based off of existing processes and the creation policy for the event and process
@param [Class] process_type @param [EventMessage] event @param [Boolean] current_processes True if there are existing processes @return [Boolean]
# File lib/synapse/process_manager/process_manager.rb, line 74 def should_start_new_process(process_type, event, current_processes) creation_policy = creation_policy_for process_type, event if :always == creation_policy true elsif :if_none_found == creation_policy !current_processes else false end end
Creates a new process of the given type with the given correlation
After the process has been created, it is notified of the given event and then is committed to the process repository.
@param [Class] process_type @param [EventMessage] event @param [Correlation] correlation @return [undefined]
# File lib/synapse/process_manager/process_manager.rb, line 144 def start_new_process(process_type, event, correlation) process = @factory.create process_type process.correlations.add correlation @lock_manager.obtain_lock process.id begin notify_process process, event ensure begin @repository.add process ensure @lock_manager.release_lock process.id end end end