module CPEE::Notifications
Public Class Methods
Source
# File lib/cpee/implementation_notifications.rb, line 21 def self::implementation(id,opts) Proc.new do if CPEE::Persistence::exists?(id,opts) on resource "notifications" do run CPEE::Notifications::Overview if get on resource "topics" do run CPEE::Notifications::Topics, opts if get end on resource "subscriptions" do run CPEE::Notifications::Subscriptions, id, opts if get run CPEE::Notifications::CreateSubscription, id, opts if post 'create_subscription' on resource do run CPEE::Notifications::Subscription, id, opts if get run CPEE::Notifications::UpdateSubscription, id, opts if put 'change_subscription' run CPEE::Notifications::DeleteSubscription, id, opts if delete on resource 'sse' do run CPEE::Notifications::SSE, id, opts if sse end end end end else run CPEE::FAIL end end end
Source
# File lib/cpee/implementation_notifications.rb, line 197 def self::sse_distributor(opts) #{{{ conn = opts[:redis_dyn].call "Server SSE" conn.psubscribe('forward:*','event:state/change') do |on| on.pmessage do |pat, what, message| if pat == 'forward:*' id, key = what.match(/forward:([^\/]+)\/(.+)/).captures if sse = opts.dig(:sse_connections,id,key) sse.send message else DeleteSubscription::set(id,opts,key) end elsif pat == 'event:state/change' mess = JSON.parse(message[message.index(' ')+1..-1]) state = mess.dig('content','state') if state == 'finished' || state == 'abandoned' opts.dig(:sse_connections,mess.dig('instance').to_s)&.each do |key,sse| EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure) sse.close end end end end end end conn.close end
Source
# File lib/cpee/implementation_notifications.rb, line 223 def self::sse_heartbeat(opts) #{{{ opts.dig(:sse_connections).each do |id,keys| keys.each do |key,sse| sse.send_with_id('heartbeat', '42') unless sse&.closed? end end end