class Roma::Event::Handler

Constants

connection_expire_time

Attributes

addr[R]
connected[R]
last_access[R]
lastcmd[R]
port[R]

Public Class Methods

connection_expire_time() click to toggle source
   # File lib/roma/event/handler.rb
51 def self.connection_expire_time
52   @@connection_expire_time
53 end
connection_expire_time=(t) click to toggle source
   # File lib/roma/event/handler.rb
47 def self.connection_expire_time=(t)
48   @@connection_expire_time = t
49 end
connections() click to toggle source
   # File lib/roma/event/handler.rb
44 def self.connections; @@connections; end
ev_list() click to toggle source
   # File lib/roma/event/handler.rb
18 def self.ev_list; @@ev_list; end
get_ccl() click to toggle source
   # File lib/roma/event/handler.rb
26 def self.get_ccl
27   "#{@@ccl_start}:#{@@ccl_rate}:#{@@ccl_full}"
28 end
new(storages, rttable) click to toggle source
   # File lib/roma/event/handler.rb
69 def initialize(storages, rttable)
70   @rbuf=''
71   unless has_event?
72     public_methods.each{|m|
73       if m.to_s.start_with?('ev_')
74         add_event(m.to_s[3..-1],m)
75       end
76     }
77   end
78   @th1 = 100
79   @close_rate = 70
80   @th2 = 200
81 
82   @storages = storages
83   @rttable = rttable
84   @log = Roma::Logging::RLogger.instance
85   @last_access = Time.now
86 end
set_ccl(ccl) click to toggle source
   # File lib/roma/event/handler.rb
30 def self.set_ccl(ccl)
31   if ccl =~ /^(\d+):(\d+):(\d+)$/
32     s,r,f = $1.to_i,$2.to_i,$3.to_i
33     return false if(s < 0 || f < 0 || r < 0 || r > 100 || s > f)
34     @@ccl_start = s
35     @@ccl_rate = r
36     @@ccl_full = f
37     return true
38   else
39     return false
40   end
41 end
system_commands() click to toggle source
   # File lib/roma/event/handler.rb
20 def self.system_commands; @@system_commands; end
timeout() click to toggle source
   # File lib/roma/event/handler.rb
60 def self.timeout
61   @@timeout
62 end
timeout=(n) click to toggle source
   # File lib/roma/event/handler.rb
56 def self.timeout=(n)
57   @@timeout = n
58 end

Public Instance Methods

post_init() click to toggle source
   # File lib/roma/event/handler.rb
88 def post_init
89   @port, @addr = Socket.unpack_sockaddr_in(get_peername)
90   @log.info("Connected from #{@addr}:#{@port}. I have #{EM.connection_count} connections.")
91   @connected = true
92   @last_access = Time.now
93   @@connections[self] = @last_access
94   @fiber = Fiber.new { dispatcher }
95 rescue Exception =>e
96   @log.error("#{__FILE__}:#{__LINE__}:#{e.inspect} #{$@}")
97 end
receive_data(data) click to toggle source
    # File lib/roma/event/handler.rb
 99 def receive_data(data)
100   @rbuf << data
101   @last_access = Time.now
102   @fiber.resume
103 rescue Exception =>e
104   @log.error("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e.inspect} #{$@}")
105 end
unbind() click to toggle source
    # File lib/roma/event/handler.rb
107 def unbind
108   @connected=false
109   begin
110     @fiber.resume
111   rescue FiberError
112   end
113   EventMachine::stop_event_loop if @stop_event_loop
114   @@connections.delete(self)
115   if @enter_time
116     # hilatency check
117     ps = Time.now - @enter_time
118     if ps > @stats.hilatency_warn_time
119       @log.warn("#{@lastcmd} has incompleted, passage of #{ps} seconds")
120     end
121   end
122   @log.info("Disconnected from #{@addr}:#{@port}")
123 rescue Exception =>e
124   @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e.inspect} #{$@}")
125 end

Protected Instance Methods

add_event(c,m) click to toggle source
    # File lib/roma/event/handler.rb
133 def add_event(c,m)
134   @@ev_list[c]=m
135 end
exit() click to toggle source
    # File lib/roma/event/handler.rb
137 def exit
138   EventMachine::stop_event_loop
139 end
has_event?() click to toggle source
    # File lib/roma/event/handler.rb
129 def has_event?
130   @@ev_list.length!=0
131 end

Private Instance Methods

conn_get_stat() click to toggle source
    # File lib/roma/event/handler.rb
266 def conn_get_stat
267   ret = {}
268   ret["connection.count"] = EM.connection_count
269   ret["connection.descriptor_table_size"] = EM.set_descriptor_table_size
270   ret["connection.continuous_limit"] = Handler.get_ccl
271   ret["connection.accepted_connection_expire_time"] = Handler.connection_expire_time
272   ret["connection.handler_instance_count"] = Handler.connections.length
273   ret["connection.pool_maxlength"] = Messaging::ConPool.instance.maxlength
274   ret["connection.pool_expire_time"] = Messaging::ConPool.instance.expire_time
275   ret["connection.EMpool_maxlength"] = Event::EMConPool::instance.maxlength
276   ret["connection.EMpool_expire_time"] = Event::EMConPool.instance.expire_time
277   ret
278 end
detach_socket() click to toggle source
    # File lib/roma/event/handler.rb
261 def detach_socket
262   @connected = false
263   Socket::for_fd(detach)
264 end
dispatcher() click to toggle source
    # File lib/roma/event/handler.rb
153 def dispatcher
154   @stats = Roma::Stats.instance
155   #@log.debug("Roma::Event::Handler.dipatcher called")
156   while(@connected) do
157     @enter_time = nil
158     next unless s=gets
159     @enter_time = Time.now
160     s=s.chomp.split(/ /)
161     # check whether comand was send or not? and check this command listed on ROMA?
162     if s[0] && @@ev_list.key?(s[0].downcase)
163       send(@@ev_list[s[0].downcase],s)
164       @lastcmd=s
165       next if @@system_commands.key?(s[0].downcase)
166     elsif s.length==0
167       next
168     elsif s[0]=='!!'
169       send(@@ev_list[@lastcmd[0].downcase],@lastcmd)
170       next if @@system_commands.key?(@lastcmd[0].downcase)
171     else
172       distance, similar_cmd = Roma::Event::Distance.check_distance(s[0], @@ev_list)
173       if distance < 0.2
174         send_data("\r\nERROR: '#{s[0]}' is not roma command.\r\nDid you mean this?\r\n\t#{similar_cmd}\r\n")
175         next
176       else
177         @log.warn("command error:#{s}")
178         send_data("ERROR: '#{s[0]}' is not roma command. Please check command.\r\n(closing telnet connection command is 'quit')\r\n")
179         next
180       end
181     end
182 
183     # hilatency check
184     ps = Time.now - @enter_time
185     if ps > @stats.hilatency_warn_time
186       @log.warn("hilatency occurred in #{@lastcmd} put in a #{ps} seconds")
187     end
188     # check latency average
189     if @stats.latency_check_cmd.include?(@lastcmd[0])
190       Roma::AsyncProcess::queue_latency.push(Roma::AsyncMessage.new('calc_latency_average', [ps, @lastcmd[0]]))
191     end
192 
193     d = EM.connection_count - @@ccl_start
194     if d > 0 &&
195         rand(100) < @@ccl_rate + (100 - @@ccl_rate) * d / (@@ccl_full - @@ccl_start)
196       send_data("ERROR\r\n")
197       close_connection_after_writing
198       @log.warn("Connection count > #{@@ccl_start}:closed")
199     end
200   end
201 rescue Storage::StorageException => e
202   @log.error("#{e.inspect} #{s} #{$@}")
203   send_data("SERVER_ERROR #{e} in storage engine\r\n")
204   close_connection_after_writing
205   if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) &&
206       Config::STORAGE_EXCEPTION_ACTION == :shutdown
207     @log.error("Romad will stop")
208     @stop_event_loop = true
209   end
210 rescue Exception =>e
211   @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e} #{$@}")
212   close_connection
213 end
get_connection(ap) click to toggle source
    # File lib/roma/event/handler.rb
143 def get_connection(ap)
144   con=Roma::Event::EMConPool::instance.get_connection(ap)
145   con.fiber=@fiber
146   con
147 end
gets() click to toggle source
    # File lib/roma/event/handler.rb
250 def gets
251   while(@connected) do
252     if idx=@rbuf.index("\n")
253       return pop(idx+1)
254     else
255       Fiber.yield(@rbuf.size)
256     end
257   end
258   nil
259 end
pop(size) click to toggle source
    # File lib/roma/event/handler.rb
215 def pop(size)
216   if size == 0
217     return ''
218   elsif size < 0
219     return nil
220   end
221 
222   if @rbuf.size >= size
223     r = @rbuf[0..size-1]
224     @rbuf = @rbuf[size..-1]
225     r
226   else
227     nil
228   end
229 end
read_bytes(size, mult = 1) click to toggle source
    # File lib/roma/event/handler.rb
231 def read_bytes(size, mult = 1)
232   t=Time.now.to_i
233   while(@connected) do
234     d = pop(size)
235     if d
236       return d
237     else
238       remain = size - @rbuf.size
239       Fiber.yield(remain)
240       if Time.now.to_i - t > @@timeout * mult
241         @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} read_bytes time out");
242         close_connection
243         return nil
244       end
245     end
246   end
247   nil
248 end
return_connection(ap,con) click to toggle source
    # File lib/roma/event/handler.rb
149 def return_connection(ap,con)
150   Roma::Event::EMConPool.instance.return_connection(ap,con)
151 end