module Roma::Command::Definition::ClassMethods
Constants
- CommandContext
- CommandParams
- StoredData
Public Instance Methods
def_command_with_key(cmd, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 168 def def_command_with_key(cmd, forward = :one_line, &block) 169 define_method "ev_#{cmd}" do |s| 170 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 171 begin 172 params = CommandParams.new 173 params.key, params.hash_name = s[1].split("\e") 174 params.hash_name ||= @defhash 175 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 176 params.vn = @rttable.get_vnode_id(params.digest) 177 params.nodes = @rttable.search_nodes_for_write(params.vn) 178 if params.nodes[0] != @nid 179 if forward == :one_line 180 return forward_and_one_line_receive(params.nodes[0], s) 181 elsif forward == :multi_line 182 return forward_and_multi_line_receive(params.nodes[0], s) 183 end 184 end 185 stored = StoredData.new 186 unless @storages[params.hash_name] 187 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 188 end 189 190 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 191 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 192 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 193 ctx = CommandContext.new(s, params, stored) 194 instance_exec(ctx, &block) 195 rescue ClientErrorException => e 196 send_data("CLIENT_ERROR #{e.message}\r\n") 197 rescue ServerErrorException => e 198 send_data("SERVER_ERROR #{e.message}\r\n") 199 rescue LocalJumpError => e 200 @log.warn("#{e} #{$@}") 201 e.exit_value 202 end 203 end 204 end
def_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 323 def def_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) 324 define_method "ev_#{cmd}" do |s| 325 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 326 begin 327 params = CommandParams.new 328 params.key, params.hash_name = s[1].split("\e") 329 params.hash_name ||= @defhash 330 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 331 params.vn = @rttable.get_vnode_id(params.digest) 332 params.nodes = @rttable.search_nodes_for_write(params.vn) 333 params.value = read_bytes(s[idx_of_val_len].to_i) 334 read_bytes(2) 335 if params.nodes[0] != @nid 336 if forward == :one_line 337 return forward_and_one_line_receive(params.nodes[0], s, params.value) 338 elsif forward == :multi_line 339 return forward_and_multi_line_receive(params.nodes[0], s, params.value) 340 end 341 end 342 stored = StoredData.new 343 unless @storages[params.hash_name] 344 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 345 end 346 347 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 348 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 349 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 350 ctx = CommandContext.new(s, params, stored) 351 instance_exec(ctx, &block) 352 rescue ClientErrorException => e 353 send_data("CLIENT_ERROR #{e.message}\r\n") 354 rescue ServerErrorException => e 355 send_data("SERVER_ERROR #{e.message}\r\n") 356 rescue LocalJumpError => e 357 @log.warn("#{e} #{$@}") 358 e.exit_value 359 end 360 end 361 end
def_command_with_relay(cmd, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 15 def def_command_with_relay(cmd, &block) 16 # check a duplicated command definition in a same scope 17 if public_method_defined? "ev_#{cmd}".to_sym 18 raise "ev_#{cmd} already defined." 19 end 20 21 # define a command receiver 22 define_method "ev_#{cmd}" do |s| 23 begin 24 res = {} 25 res[@stats.ap_str] = instance_exec(s, &block) 26 # command relay 27 res.merge! broadcast_cmd("r#{cmd} #{s[1..-1].join(' ')}\r\n") 28 send_data("#{res}\r\n") 29 rescue ClientErrorException => e 30 send_data("CLIENT_ERROR #{e.message}\r\n") 31 rescue ServerErrorException => e 32 send_data("SERVER_ERROR #{e.message}\r\n") 33 rescue LocalJumpError => e 34 @log.warn("#{e} #{$@}") 35 e.exit_value 36 end 37 end 38 39 # define a relaid command receiver 40 define_method "ev_r#{cmd}" do |s| 41 begin 42 send_data("#{instance_exec(s, &block)}\r\n") 43 rescue ClientErrorException => e 44 send_data("CLIENT_ERROR #{e.message}\r\n") 45 rescue ServerErrorException => e 46 send_data("SERVER_ERROR #{e.message}\r\n") 47 rescue LocalJumpError => e 48 @log.warn("#{e} #{$@}") 49 e.exit_value 50 end 51 end 52 end
def_read_command_with_key(cmd, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 58 def def_read_command_with_key(cmd, forward = :one_line, &block) 59 define_method "ev_#{cmd}" do |s| 60 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 61 begin 62 params = CommandParams.new 63 params.key, params.hash_name = s[1].split("\e") 64 params.hash_name ||= @defhash 65 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 66 params.vn = @rttable.get_vnode_id(params.digest) 67 params.nodes = @rttable.search_nodes_for_write(params.vn) 68 if params.nodes[0] != @nid 69 if forward == :one_line 70 return forward_and_one_line_receive(params.nodes[0], s) 71 elsif forward == :multi_line 72 return forward_and_multi_line_receive(params.nodes[0], s) 73 end 74 end 75 stored = StoredData.new 76 unless @storages[params.hash_name] 77 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 78 end 79 80 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 81 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 82 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 83 ctx = CommandContext.new(s, params, stored) 84 instance_exec(ctx, &block) 85 @stats.read_count += 1 86 rescue ClientErrorException => e 87 send_data("CLIENT_ERROR #{e.message}\r\n") 88 rescue ServerErrorException => e 89 send_data("SERVER_ERROR #{e.message}\r\n") 90 rescue LocalJumpError => e 91 @log.warn("#{e} #{$@}") 92 e.exit_value 93 end 94 end 95 end
def_read_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 281 def def_read_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) 282 define_method "ev_#{cmd}" do |s| 283 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 284 begin 285 params = CommandParams.new 286 params.key, params.hash_name = s[1].split("\e") 287 params.hash_name ||= @defhash 288 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 289 params.vn = @rttable.get_vnode_id(params.digest) 290 params.nodes = @rttable.search_nodes_for_write(params.vn) 291 params.value = read_bytes(s[idx_of_val_len].to_i) 292 read_bytes(2) 293 if params.nodes[0] != @nid 294 if forward == :one_line 295 return forward_and_one_line_receive(params.nodes[0], s, params.value) 296 elsif forward == :multi_line 297 return forward_and_multi_line_receive(params.nodes[0], s, params.value) 298 end 299 end 300 stored = StoredData.new 301 unless @storages[params.hash_name] 302 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 303 end 304 305 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 306 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 307 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 308 ctx = CommandContext.new(s, params, stored) 309 310 instance_exec(ctx, &block) 311 @stats.read_count += 1 312 rescue ClientErrorException => e 313 send_data("CLIENT_ERROR #{e.message}\r\n") 314 rescue ServerErrorException => e 315 send_data("SERVER_ERROR #{e.message}\r\n") 316 rescue LocalJumpError => e 317 @log.warn("#{e} #{$@}") 318 e.exit_value 319 end 320 end 321 end
def_write_command_with_key(cmd, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 97 def def_write_command_with_key(cmd, forward = :one_line, &block) 98 define_method "ev_#{cmd}" do |s| 99 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 100 begin 101 params = CommandParams.new 102 params.key, params.hash_name = s[1].split("\e") 103 params.hash_name ||= @defhash 104 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 105 params.vn = @rttable.get_vnode_id(params.digest) 106 params.nodes = @rttable.search_nodes_for_write(params.vn) 107 if params.nodes[0] != @nid 108 if forward == :one_line 109 return forward_and_one_line_receive(params.nodes[0], s) 110 elsif forward == :multi_line 111 return forward_and_multi_line_receive(params.nodes[0], s) 112 end 113 end 114 stored = StoredData.new 115 unless @storages[params.hash_name] 116 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 117 end 118 119 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 120 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 121 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 122 if stored && @stats.wb_command_map.key?("#{cmd}__prev".to_sym) 123 Roma::WriteBehindProcess::push(params.hash_name, 124 @stats.wb_command_map["#{cmd}__prev".to_sym], 125 params.key, stored.value) 126 end 127 ctx = CommandContext.new(s, params, stored) 128 129 ret = instance_exec(ctx, &block) 130 if ret.instance_of? Array 131 flg, expt, value, count, msg = ret 132 ret = @storages[ctx.params.hash_name].set(ctx.params.vn, 133 ctx.params.key, 134 ctx.params.digest, 135 expt, 136 value) 137 if count == :write 138 @stats.write_count += 1 139 elsif count == :delete 140 @stats.delete_count += 1 141 end 142 143 if ret 144 if @stats.wb_command_map.key?(cmd.to_sym) 145 Roma::WriteBehindProcess::push(ctx.params.hash_name, 146 @stats.wb_command_map[cmd.to_sym], 147 ctx.params.key, ret[4]) 148 end 149 redundant(ctx.params.nodes[1..-1], ctx.params.hash_name, 150 ctx.params.key, ctx.params.digest, ret[2], 151 expt, ret[4]) 152 send_data("#{msg}\r\n") 153 else 154 send_data("NOT_#{msg}\r\n") 155 end 156 end 157 rescue ClientErrorException => e 158 send_data("CLIENT_ERROR #{e.message}\r\n") 159 rescue ServerErrorException => e 160 send_data("SERVER_ERROR #{e.message}\r\n") 161 rescue LocalJumpError => e 162 @log.warn("#{e} #{$@}") 163 e.exit_value 164 end 165 end 166 end
def_write_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
click to toggle source
# File lib/roma/command/command_definition.rb 206 def def_write_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) 207 define_method "ev_#{cmd}" do |s| 208 return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2 209 begin 210 params = CommandParams.new 211 params.key, params.hash_name = s[1].split("\e") 212 params.hash_name ||= @defhash 213 params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits 214 params.vn = @rttable.get_vnode_id(params.digest) 215 params.nodes = @rttable.search_nodes_for_write(params.vn) 216 params.value = read_bytes(s[idx_of_val_len].to_i) 217 read_bytes(2) 218 if params.nodes[0] != @nid 219 if forward == :one_line 220 return forward_and_one_line_receive(params.nodes[0], s, params.value) 221 elsif forward == :multi_line 222 return forward_and_multi_line_receive(params.nodes[0], s, params.value) 223 end 224 end 225 stored = StoredData.new 226 unless @storages[params.hash_name] 227 return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n") 228 end 229 230 stored.vn, stored.last, stored.clk, stored.expt, stored.value = 231 @storages[params.hash_name].get_raw(params.vn, params.key, params.digest) 232 stored = nil if stored.vn == nil || Time.now.to_i > stored.expt 233 234 if stored && @stats.wb_command_map.key?("#{cmd}__prev".to_sym) 235 Roma::WriteBehindProcess::push(params.hash_name, 236 @stats.wb_command_map["#{cmd}__prev".to_sym], 237 params.key, stored.value) 238 end 239 240 ctx = CommandContext.new(s, params, stored) 241 242 ret = instance_exec(ctx, &block) 243 if ret.instance_of? Array 244 flg, expt, value, count, msg = ret 245 ret = @storages[ctx.params.hash_name].set(ctx.params.vn, 246 ctx.params.key, 247 ctx.params.digest, 248 expt, 249 value) 250 if count == :write 251 @stats.write_count += 1 252 elsif count == :delete 253 @stats.delete_count += 1 254 end 255 256 if ret 257 if @stats.wb_command_map.key?(cmd.to_sym) 258 Roma::WriteBehindProcess::push(ctx.params.hash_name, 259 @stats.wb_command_map[cmd.to_sym], 260 ctx.params.key, ret[4]) 261 end 262 redundant(ctx.params.nodes[1..-1], ctx.params.hash_name, 263 ctx.params.key, ctx.params.digest, ret[2], 264 expt, ret[4]) 265 send_data("#{msg}\r\n") 266 else 267 send_data("NOT_#{msg}\r\n") 268 end 269 end 270 rescue ClientErrorException => e 271 send_data("CLIENT_ERROR #{e.message}\r\n") 272 rescue ServerErrorException => e 273 send_data("SERVER_ERROR #{e.message}\r\n") 274 rescue LocalJumpError => e 275 @log.warn("#{e} #{$@}") 276 e.exit_value 277 end 278 end 279 end