class Deepstream::EventHandler

Public Class Methods

new(client) click to toggle source
# File lib/deepstream/event_handler.rb, line 8
def initialize(client)
  @client = client
  @callbacks = {}
  @listeners = {}
  @ack_timeout_registry = AckTimeoutRegistry.new(@client)
end

Public Instance Methods

emit(event, *args, timeout: @client.options[:emit_timeout], **kwargs) click to toggle source
# File lib/deepstream/event_handler.rb, line 53
def emit(event, *args, timeout: @client.options[:emit_timeout], **kwargs)
  data = Helpers.message_data(*args, **kwargs)
  @client.send_message(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout)
rescue => e
  @client.on_exception(e)
end
listen(pattern, &block) click to toggle source
# File lib/deepstream/event_handler.rb, line 26
def listen(pattern, &block)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners[pattern] = block
  @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern)
  @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}")
rescue => e
  @client.on_exception(e)
end
on(event, &block) click to toggle source
# File lib/deepstream/event_handler.rb, line 15
def on(event, &block)
  unless @callbacks[event]
    @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) if @client.state == CONNECTION_STATE::OPEN
    @ack_timeout_registry.add(event, "No ACK message received in time for #{event}")
  end
  @callbacks[event] = block
rescue => e
  @client.on_exception(e)
end
Also aliased as: subscribe
on_message(message) click to toggle source
# File lib/deepstream/event_handler.rb, line 43
def on_message(message)
  case message.action
  when ACTION::ACK then @ack_timeout_registry.cancel(message.data.last)
  when ACTION::EVENT then fire_event_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback(message)
  else raise(UnknownAction, message)
  end
end
resubscribe() click to toggle source
# File lib/deepstream/event_handler.rb, line 67
def resubscribe
  @callbacks.keys.each { |event| @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) }
  @listeners.keys.each { |pattern| @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern) }
rescue => e
  @client.on_exception(e)
end
subscribe(event, &block)
Alias for: on
unlisten(pattern) click to toggle source
# File lib/deepstream/event_handler.rb, line 35
def unlisten(pattern)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners.delete(pattern)
  @client.send_message(TOPIC::EVENT, ACTION::UNLISTEN, pattern)
rescue => e
  @client.on_exception(e)
end
unsubscribe(event) click to toggle source
# File lib/deepstream/event_handler.rb, line 60
def unsubscribe(event)
  @callbacks.delete(event)
  @client.send_message(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event)
rescue => e
  @client.on_exception(e)
end

Private Instance Methods

fire_event_callback(message) click to toggle source
# File lib/deepstream/event_handler.rb, line 76
def fire_event_callback(message)
  event, data = message.data
  @callbacks[event].call(Helpers.to_type(data))
end
fire_listen_callback(message) click to toggle source
# File lib/deepstream/event_handler.rb, line 81
def fire_listen_callback(message)
  is_subscribed = message.action == ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND
  pattern, event = message.data
  return @client.on_error(pattern) unless @listeners[pattern]
  @listeners[pattern].call(is_subscribed, event)
end