class ActionManager

This class provides support to handle actions. Class methods, or actions, can be registered in the action manager. The manager will wait for actions to be triggered (thread-safe), and will execute them concurrently. The action manager can be used to synchronize different objects in different threads

Example

class Sample
    attr_reader :am

    def initialize
        @am = ActionManager.new(15,true)

        @am.register_action("SLEEP",method("sleep_action"))
    end

    def sleep_action(secs)
        sleep(secs)
    end

    def finalize_action
        p "Exiting..."
        @am.stop_listener
    end
end

s = Sample.new

s.@am.start_listener

# Objects in other threads can trigger actions like this # s.am.trigger_action(“SLEEP”,rand(3)+1) # s.am.trigger_action(“FINALIZE”)

Public Class Methods

new(concurrency=10, threaded=true) click to toggle source

Creates a new Action Manager

concurrency is the maximun number of actions that can run at the same time threaded if true actions will be executed by default in a different thread

# File lib/ActionManager.rb, line 64
def initialize(concurrency=10, threaded=true)
    @finalize       = false
    @actions        = Hash.new
    @threaded       = threaded

    @concurrency    = concurrency
    @num_running    = 0

    @action_queue   = Array.new
    @action_running = Hash.new

    @threads_mutex  = Mutex.new
    @threads_cond   = ConditionVariable.new
end

Public Instance Methods

cancel_action(action_id) click to toggle source
# File lib/ActionManager.rb, line 140
def cancel_action(action_id)
    @threads_mutex.synchronize {
        action = @action_running[action_id]
        if action
            thread = action[:thread]
        else
            thread = nil
        end

        if thread
            thread.kill

            @num_running -= 1
            delete_running_action(action_id)

            @threads_cond.signal
        else
            i = @action_queue.select{|x| x[:id] == action_id}.first
            @action_queue.delete(i) if i
        end
    }
end
register_action(aname, method, threaded=nil) click to toggle source

Registers a new action in the manager. An action is defined by:

aname name of the action, it will identify the action method it's invoked with call. It should be a Proc or Method object threaded execute the action in a new thread

# File lib/ActionManager.rb, line 84
def register_action(aname, method, threaded=nil)
    threaded ||= @threaded

    @actions[aname]={
        :method     => method,
        :threaded   => threaded
    }
end
start_listener() click to toggle source
# File lib/ActionManager.rb, line 163
def start_listener
    while true
        @threads_mutex.synchronize {
            while ((@concurrency - @num_running)==0) || empty_queue
                @threads_cond.wait(@threads_mutex)

                return if (@finalize && @num_running == 0)
            end

            run_action
        }
    end
end
trigger_action(aname, action_id, *aargs) click to toggle source

Triggers the execution of the action.

aname name of the action action_id an id to identify the action (to cancel it later) aargs arguments to call the action

# File lib/ActionManager.rb, line 98
def trigger_action(aname, action_id, *aargs)

    @threads_mutex.synchronize {
        return if @finalize

        if aname == :FINALIZE
            finalize if respond_to?(:finalize)
            @finalize = true
            @threads_cond.signal if @num_running == 0
            return
        end

        if !@actions.has_key?(aname)
            return
        end

        arity=@actions[aname][:method].arity

        if arity < 0
            # Last parameter is an array
            arity = -arity - 1
            if arity > aargs.length
                # Message has not enough parameters
                return
            end
            # Converts last parameters to an array
            aargs[arity..-1]=[aargs[arity..-1]]
        else
            if arity != aargs.length
                return
            end
        end

        @action_queue << @actions[aname].merge(:args => aargs,
                :id => action_id)

        if @num_running < @concurrency
            @threads_cond.signal
        end
    }
end

Protected Instance Methods

delete_running_action(action_id) click to toggle source
# File lib/ActionManager.rb, line 179
def delete_running_action(action_id)
    @action_running.delete(action_id)
end
empty_queue() click to toggle source
# File lib/ActionManager.rb, line 187
def empty_queue
    @action_queue.size==0
end
get_runable_action() click to toggle source
# File lib/ActionManager.rb, line 183
def get_runable_action
    @action_queue.shift
end
run_action() click to toggle source
# File lib/ActionManager.rb, line 191
def run_action
    action = get_runable_action

    if action
        @num_running += 1

        if action[:threaded]
            thread = Thread.new {
                begin
                    action[:method].call(*action[:args])
                ensure
                    @threads_mutex.synchronize {
                        @num_running -= 1
                        delete_running_action(action[:id])

                        @threads_cond.signal
                    }
                end
            }

            action[:thread] = thread
            @action_running[action[:id]] = action
        else
            action[:method].call(*action[:args])

            @num_running -= 1
        end
    end
end