class Roma::Event::Handler
Constants
- connection_expire_time
Attributes
addr[R]
connected[R]
last_access[R]
lastcmd[R]
port[R]
Public Class Methods
connection_expire_time()
click to toggle source
# File lib/roma/event/handler.rb 51 def self.connection_expire_time 52 @@connection_expire_time 53 end
connection_expire_time=(t)
click to toggle source
# File lib/roma/event/handler.rb 47 def self.connection_expire_time=(t) 48 @@connection_expire_time = t 49 end
connections()
click to toggle source
# File lib/roma/event/handler.rb 44 def self.connections; @@connections; end
ev_list()
click to toggle source
# File lib/roma/event/handler.rb 18 def self.ev_list; @@ev_list; end
get_ccl()
click to toggle source
# File lib/roma/event/handler.rb 26 def self.get_ccl 27 "#{@@ccl_start}:#{@@ccl_rate}:#{@@ccl_full}" 28 end
new(storages, rttable)
click to toggle source
# File lib/roma/event/handler.rb 69 def initialize(storages, rttable) 70 @rbuf='' 71 unless has_event? 72 public_methods.each{|m| 73 if m.to_s.start_with?('ev_') 74 add_event(m.to_s[3..-1],m) 75 end 76 } 77 end 78 @th1 = 100 79 @close_rate = 70 80 @th2 = 200 81 82 @storages = storages 83 @rttable = rttable 84 @log = Roma::Logging::RLogger.instance 85 @last_access = Time.now 86 end
set_ccl(ccl)
click to toggle source
# File lib/roma/event/handler.rb 30 def self.set_ccl(ccl) 31 if ccl =~ /^(\d+):(\d+):(\d+)$/ 32 s,r,f = $1.to_i,$2.to_i,$3.to_i 33 return false if(s < 0 || f < 0 || r < 0 || r > 100 || s > f) 34 @@ccl_start = s 35 @@ccl_rate = r 36 @@ccl_full = f 37 return true 38 else 39 return false 40 end 41 end
system_commands()
click to toggle source
# File lib/roma/event/handler.rb 20 def self.system_commands; @@system_commands; end
timeout()
click to toggle source
# File lib/roma/event/handler.rb 60 def self.timeout 61 @@timeout 62 end
timeout=(n)
click to toggle source
# File lib/roma/event/handler.rb 56 def self.timeout=(n) 57 @@timeout = n 58 end
Public Instance Methods
post_init()
click to toggle source
# File lib/roma/event/handler.rb 88 def post_init 89 @port, @addr = Socket.unpack_sockaddr_in(get_peername) 90 @log.info("Connected from #{@addr}:#{@port}. I have #{EM.connection_count} connections.") 91 @connected = true 92 @last_access = Time.now 93 @@connections[self] = @last_access 94 @fiber = Fiber.new { dispatcher } 95 rescue Exception =>e 96 @log.error("#{__FILE__}:#{__LINE__}:#{e.inspect} #{$@}") 97 end
receive_data(data)
click to toggle source
# File lib/roma/event/handler.rb 99 def receive_data(data) 100 @rbuf << data 101 @last_access = Time.now 102 @fiber.resume 103 rescue Exception =>e 104 @log.error("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e.inspect} #{$@}") 105 end
unbind()
click to toggle source
# File lib/roma/event/handler.rb 107 def unbind 108 @connected=false 109 begin 110 @fiber.resume 111 rescue FiberError 112 end 113 EventMachine::stop_event_loop if @stop_event_loop 114 @@connections.delete(self) 115 if @enter_time 116 # hilatency check 117 ps = Time.now - @enter_time 118 if ps > @stats.hilatency_warn_time 119 @log.warn("#{@lastcmd} has incompleted, passage of #{ps} seconds") 120 end 121 end 122 @log.info("Disconnected from #{@addr}:#{@port}") 123 rescue Exception =>e 124 @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e.inspect} #{$@}") 125 end
Protected Instance Methods
add_event(c,m)
click to toggle source
# File lib/roma/event/handler.rb 133 def add_event(c,m) 134 @@ev_list[c]=m 135 end
exit()
click to toggle source
# File lib/roma/event/handler.rb 137 def exit 138 EventMachine::stop_event_loop 139 end
has_event?()
click to toggle source
# File lib/roma/event/handler.rb 129 def has_event? 130 @@ev_list.length!=0 131 end
Private Instance Methods
conn_get_stat()
click to toggle source
# File lib/roma/event/handler.rb 266 def conn_get_stat 267 ret = {} 268 ret["connection.count"] = EM.connection_count 269 ret["connection.descriptor_table_size"] = EM.set_descriptor_table_size 270 ret["connection.continuous_limit"] = Handler.get_ccl 271 ret["connection.accepted_connection_expire_time"] = Handler.connection_expire_time 272 ret["connection.handler_instance_count"] = Handler.connections.length 273 ret["connection.pool_maxlength"] = Messaging::ConPool.instance.maxlength 274 ret["connection.pool_expire_time"] = Messaging::ConPool.instance.expire_time 275 ret["connection.EMpool_maxlength"] = Event::EMConPool::instance.maxlength 276 ret["connection.EMpool_expire_time"] = Event::EMConPool.instance.expire_time 277 ret 278 end
detach_socket()
click to toggle source
# File lib/roma/event/handler.rb 261 def detach_socket 262 @connected = false 263 Socket::for_fd(detach) 264 end
dispatcher()
click to toggle source
# File lib/roma/event/handler.rb 153 def dispatcher 154 @stats = Roma::Stats.instance 155 #@log.debug("Roma::Event::Handler.dipatcher called") 156 while(@connected) do 157 @enter_time = nil 158 next unless s=gets 159 @enter_time = Time.now 160 s=s.chomp.split(/ /) 161 # check whether comand was send or not? and check this command listed on ROMA? 162 if s[0] && @@ev_list.key?(s[0].downcase) 163 send(@@ev_list[s[0].downcase],s) 164 @lastcmd=s 165 next if @@system_commands.key?(s[0].downcase) 166 elsif s.length==0 167 next 168 elsif s[0]=='!!' 169 send(@@ev_list[@lastcmd[0].downcase],@lastcmd) 170 next if @@system_commands.key?(@lastcmd[0].downcase) 171 else 172 distance, similar_cmd = Roma::Event::Distance.check_distance(s[0], @@ev_list) 173 if distance < 0.2 174 send_data("\r\nERROR: '#{s[0]}' is not roma command.\r\nDid you mean this?\r\n\t#{similar_cmd}\r\n") 175 next 176 else 177 @log.warn("command error:#{s}") 178 send_data("ERROR: '#{s[0]}' is not roma command. Please check command.\r\n(closing telnet connection command is 'quit')\r\n") 179 next 180 end 181 end 182 183 # hilatency check 184 ps = Time.now - @enter_time 185 if ps > @stats.hilatency_warn_time 186 @log.warn("hilatency occurred in #{@lastcmd} put in a #{ps} seconds") 187 end 188 # check latency average 189 if @stats.latency_check_cmd.include?(@lastcmd[0]) 190 Roma::AsyncProcess::queue_latency.push(Roma::AsyncMessage.new('calc_latency_average', [ps, @lastcmd[0]])) 191 end 192 193 d = EM.connection_count - @@ccl_start 194 if d > 0 && 195 rand(100) < @@ccl_rate + (100 - @@ccl_rate) * d / (@@ccl_full - @@ccl_start) 196 send_data("ERROR\r\n") 197 close_connection_after_writing 198 @log.warn("Connection count > #{@@ccl_start}:closed") 199 end 200 end 201 rescue Storage::StorageException => e 202 @log.error("#{e.inspect} #{s} #{$@}") 203 send_data("SERVER_ERROR #{e} in storage engine\r\n") 204 close_connection_after_writing 205 if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) && 206 Config::STORAGE_EXCEPTION_ACTION == :shutdown 207 @log.error("Romad will stop") 208 @stop_event_loop = true 209 end 210 rescue Exception =>e 211 @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} #{e} #{$@}") 212 close_connection 213 end
get_connection(ap)
click to toggle source
# File lib/roma/event/handler.rb 143 def get_connection(ap) 144 con=Roma::Event::EMConPool::instance.get_connection(ap) 145 con.fiber=@fiber 146 con 147 end
gets()
click to toggle source
# File lib/roma/event/handler.rb 250 def gets 251 while(@connected) do 252 if idx=@rbuf.index("\n") 253 return pop(idx+1) 254 else 255 Fiber.yield(@rbuf.size) 256 end 257 end 258 nil 259 end
pop(size)
click to toggle source
# File lib/roma/event/handler.rb 215 def pop(size) 216 if size == 0 217 return '' 218 elsif size < 0 219 return nil 220 end 221 222 if @rbuf.size >= size 223 r = @rbuf[0..size-1] 224 @rbuf = @rbuf[size..-1] 225 r 226 else 227 nil 228 end 229 end
read_bytes(size, mult = 1)
click to toggle source
# File lib/roma/event/handler.rb 231 def read_bytes(size, mult = 1) 232 t=Time.now.to_i 233 while(@connected) do 234 d = pop(size) 235 if d 236 return d 237 else 238 remain = size - @rbuf.size 239 Fiber.yield(remain) 240 if Time.now.to_i - t > @@timeout * mult 241 @log.warn("#{__FILE__}:#{__LINE__}:#{@addr}:#{@port} read_bytes time out"); 242 close_connection 243 return nil 244 end 245 end 246 end 247 nil 248 end
return_connection(ap,con)
click to toggle source
# File lib/roma/event/handler.rb 149 def return_connection(ap,con) 150 Roma::Event::EMConPool.instance.return_connection(ap,con) 151 end