module Roma::CommandPlugin::PluginStorage
Public Instance Methods
“add” means that “add a new data to a store” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 248 def ev_add(s); set(:add,s); end
“append” means that “append a new data to the previous one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 260 def ev_append(s); set(:append,s); end
“cas” means that “store this data but only if no one else has updated since I last fetched it.” <command name> <key> <flags> <exptime> <bytes> <cas-id>rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 273 def ev_cas(s) 274 key,hname = s[1].split("\e") 275 hname ||= @defhash 276 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 277 v = read_bytes(s[4].to_i) 278 read_bytes(2) 279 vn = @rttable.get_vnode_id(d) 280 nodes = @rttable.search_nodes_for_write(vn) 281 if nodes[0] != @nid 282 @log.warn("forward cas key=#{key} vn=#{vn} to #{nodes[0]}") 283 res = send_cmd(nodes[0],"fcas #{key}\e#{hname} #{d} #{s[3]} #{v.length} #{s[5]}\r\n#{v}\r\n") 284 if res == nil || res.start_with?("ERROR") 285 return send_data("SERVER_ERROR Message forward failed.\r\n") 286 end 287 return send_data("#{res}\r\n") 288 end 289 290 store_cas(hname, vn, key, d, s[5].to_i, s[3].to_i, v, nodes[1..-1]) 291 end
decr <key> <value> [noreply]rn
# File lib/roma/plugin/plugin_storage.rb 316 def ev_decr(s); incr_decr(:decr,s); end
delete <key> [<time>] [noreply]rn
# File lib/roma/plugin/plugin_storage.rb 118 def ev_delete(s) 119 if s.length < 2 120 @log.error("delete:wrong number of arguments(#{s})") 121 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 122 end 123 124 key,hname = s[1].split("\e") 125 hname ||= @defhash 126 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 127 vn = @rttable.get_vnode_id(d) 128 nodes = @rttable.search_nodes_for_write(vn) 129 130 if nodes[0] != @nid 131 cmd = "fdelete #{key}\e#{hname}" 132 s[2..-1].each{|c| cmd << " #{c}"} 133 cmd << "\r\n" 134 @log.warn("forward delete #{s[1]}") 135 res = send_cmd(nodes[0], cmd) 136 if res == nil || res.start_with?("ERROR") 137 return send_data("SERVER_ERROR Message forward failed.\r\n") 138 end 139 return send_data("#{res}\r\n") 140 end 141 142 unless @storages.key?(hname) 143 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 144 return 145 end 146 147 if @stats.wb_command_map.key?(:delete__prev) 148 data = @storages[hname].get(vn, key, d) 149 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete__prev], key, data) if data 150 end 151 152 res = @storages[hname].delete(vn, key, d) 153 @stats.delete_count += 1 154 155 return send_data("NOT_DELETED\r\n") unless res 156 return send_data("NOT_FOUND\r\n") if res == :deletemark 157 158 if @stats.wb_command_map.key?(:delete) 159 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete], key, res[4]) 160 end 161 162 nodes[1..-1].each{ |nid| 163 res2 = send_cmd(nid,"rdelete #{key}\e#{hname} #{res[2]}\r\n") 164 unless res2 165 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('rdelete',[nid,hname,s[1],res[2]])) 166 @log.warn("rdelete failed:#{s[1]}\e#{hname} #{d} #{res[2]} -> #{nid}") 167 end 168 } 169 return send_data("NOT_FOUND\r\n") unless res[4] 170 171 if $roma.cr_writer.run_replication 172 fnc = 'delete' 173 Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]}\r\n", s[1], nil) 174 end 175 176 send_data("DELETED\r\n") 177 end
# File lib/roma/plugin/plugin_storage.rb 249 def ev_fadd(s); fset(:add,s); end
# File lib/roma/plugin/plugin_storage.rb 261 def ev_fappend(s); fset(:append,s); end
# File lib/roma/plugin/plugin_storage.rb 293 def ev_fcas(s) 294 key,hname = s[1].split("\e") 295 hname ||= @defhash 296 d = s[2].to_i 297 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0 298 v = read_bytes(s[4].to_i) 299 read_bytes(2) 300 vn = @rttable.get_vnode_id(d) 301 nodes = @rttable.search_nodes_for_write(vn) 302 if nodes.include?(@nid) == false 303 @log.error("fcas failed key = #{s[1]} vn = #{vn}") 304 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 305 end 306 307 nodes.delete(@nid) 308 store_cas(hname, vn, key, d, s[5].to_i, s[3].to_i, v, nodes) 309 end
# File lib/roma/plugin/plugin_storage.rb 317 def ev_fdecr(s); fincr_fdecr(:decr,s); end
fdelete <key> [<time>] [noreply]rn
# File lib/roma/plugin/plugin_storage.rb 180 def ev_fdelete(s) 181 key,hname = s[1].split("\e") 182 hname ||= @defhash 183 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 184 vn = @rttable.get_vnode_id(d) 185 nodes = @rttable.search_nodes_for_write(vn) 186 if nodes.include?(@nid) == false 187 @log.error("fdelete failed delete key=#{s[1]} vn=#{vn}") 188 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 189 end 190 unless @storages.key?(hname) 191 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 192 return 193 end 194 195 if @stats.wb_command_map.key?(:delete__prev) 196 data = @storages[hname].get(vn, key, d) 197 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete__prev], key, data) if data 198 end 199 200 res = @storages[hname].delete(vn, key, d) 201 @stats.delete_count += 1 202 203 return send_data("NOT_DELETED\r\n") unless res 204 return send_data("NOT_FOUND\r\n") if res == :deletemark 205 206 if @stats.wb_command_map.key?(:delete) 207 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete], key, res[4]) 208 end 209 210 nodes.delete(@nid) 211 nodes.each{ |nid| 212 res2 = send_cmd(nid,"rdelete #{key}\e#{hname} #{res[2]}\r\n") 213 unless res2 214 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('rdelete',[nid,hname,s[1],res[2]])) 215 @log.warn("rdelete failed:#{s[1]}\e#{hname} #{d} #{res[2]} -> #{nid}") 216 end 217 } 218 return send_data("NOT_FOUND\r\n") unless res[4] 219 220 if $roma.cr_writer.run_replication 221 fnc = 'delete' 222 Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]}\r\n", s[1], nil) 223 end 224 225 send_data("DELETED\r\n") 226 end
fget <key>
# File lib/roma/plugin/plugin_storage.rb 54 def ev_fget(s) 55 key,hname = s[1].split("\e") 56 hname ||= @defhash 57 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 58 vn = @rttable.get_vnode_id(d) 59 nodes = @rttable.search_nodes(vn) 60 61 unless nodes.include?(@nid) 62 @log.error("fget failed key=#{s[1]} vn=#{vn}") 63 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 64 end 65 66 unless @storages.key?(hname) 67 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 68 return 69 end 70 data = @storages[hname].get(vn, key, 0) 71 @stats.read_count += 1 72 send_data("VALUE #{s[1]} 0 #{data.length}\r\n#{data}\r\n") if data 73 send_data("END\r\n") 74 end
# File lib/roma/plugin/plugin_storage.rb 313 def ev_fincr(s); fincr_fdecr(:incr,s); end
# File lib/roma/plugin/plugin_storage.rb 267 def ev_fprepend(s); fset(:prepend,s); end
# File lib/roma/plugin/plugin_storage.rb 255 def ev_freplace(s); fset(:replace,s); end
# File lib/roma/plugin/plugin_storage.rb 15 def ev_fset(s); fset(:set,s); end
fset_expt <key> <expt>
# File lib/roma/plugin/plugin_storage.rb 367 def ev_fset_expt(s) 368 key,hname = s[1].split("\e") 369 hname ||= @defhash 370 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 371 vn = @rttable.get_vnode_id(d) 372 nodes = @rttable.search_nodes_for_write(vn) 373 if nodes.include?(@nid) == false 374 @log.error("fset_expt failed key = #{s[1]} vn = #{vn}") 375 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 376 end 377 378 unless @storages.key?(hname) 379 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 380 return 381 end 382 383 if @stats.wb_command_map.key?(:set_expt__prev) 384 # [vn, t, clk, expt, val] 385 data = @storages[hname].get_raw(vn, key, d) 386 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt__prev], key, data[3].to_s) if data 387 end 388 389 expt = chg_time_expt(s[2].to_i) 390 ret = @storages[hname].set_expt(vn, key, d, expt) 391 392 if ret 393 if @stats.wb_command_map.key?(:set_expt) 394 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt], key, expt.to_s) 395 end 396 redundant(nodes[1..-1], hname, key, d, ret[2], ret[3], ret[4]) 397 if $roma.cr_writer.run_replication 398 fnc = 'set_expt' 399 Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]} #{expt}\r\n", s[1], expt) 400 end 401 send_data("STORED\r\n") 402 else 403 return send_data("NOT_STORED\r\n") 404 end 405 end
get <key>*rn
# File lib/roma/plugin/plugin_storage.rb 18 def ev_get(s) 19 if s.length < 2 20 @log.error("get:wrong number of arguments(#{s})") 21 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 22 end 23 24 return ev_gets(s) if s.length > 2 25 26 key,hname = s[1].split("\e") 27 hname ||= @defhash 28 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 29 vn = @rttable.get_vnode_id(d) 30 nodes = @rttable.search_nodes(vn) 31 32 unless nodes.include?(@nid) 33 @log.warn("forward get #{s[1]}") 34 res = forward_get(nodes[0], s[1], d) 35 if res 36 send_data(res) 37 else 38 send_data("SERVER_ERROR Message forward failed.\r\n") 39 end 40 return 41 end 42 43 unless @storages.key?(hname) 44 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 45 return 46 end 47 data = @storages[hname].get(vn, key, 0) 48 @stats.read_count += 1 49 send_data("VALUE #{s[1]} 0 #{data.length}\r\n#{data}\r\n") if data 50 send_data("END\r\n") 51 end
If you want to get expired time as UNIXTIME, set the 'unix' in last argument Unless set this, expired time will be sent back as date format. get_expt <key> [unix]
# File lib/roma/plugin/plugin_storage.rb 410 def ev_get_expt(s) 411 unless s.length.between?(2, 3) 412 @log.error("get_expt: wrong number of arguments(#{s.length-1} to 2-3)") 413 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 414 end 415 case s[2] 416 when 'unix' 417 is_unix = true 418 when nil 419 is_unix = false 420 else 421 @log.error("get_expt: wrong format of arguments.") 422 return send_data("CLIENT_ERROR Wrong format of arguments.\r\n") 423 end 424 425 key, hname = s[1].split("\e") 426 hname ||= @defhash 427 unless @storages.key?(hname) 428 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 429 return 430 end 431 432 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 433 vn = @rttable.get_vnode_id(d) 434 435 nodes = @rttable.search_nodes(vn) 436 unless nodes.include?(@nid) 437 @log.warn("forward get_expt #{s[1]} #{s[2]}") 438 res = forward_get_expt(nodes[0], vn, s[1], s[2]) 439 if res 440 send_data(res) 441 else 442 send_data("SERVER_ERROR Message forward failed.\r\n") 443 end 444 return 445 end 446 447 data = @storages[hname].db_get(vn, key) 448 if data 449 if is_unix 450 expt = data.unpack('NNNNa*')[3] 451 else 452 expt = Time.at(data.unpack('NNNNa*')[3]) 453 end 454 send_data("#{expt}\r\n") 455 end 456 send_data("END\r\n") 457 end
gets <key>*rn
# File lib/roma/plugin/plugin_storage.rb 77 def ev_gets(s) 78 nk = {} # {node-id1=>[key1,key2,..],node-id2=>[key3,key4,..]} 79 kvn = {} # {key1=>vn1, key2=>vn2, ... } 80 s[1..-1].each{|kh| 81 key, = kh.split("\e") # split a hash-name 82 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 83 kvn[key] = vn = @rttable.get_vnode_id(d) 84 nodes = @rttable.search_nodes(vn) 85 unless nodes.empty? # check the node existence 86 nk[nodes[0]]=[] unless nk.key?(nodes[0]) 87 nk[nodes[0]] << kh 88 end 89 } 90 91 res = {} # result data {key1=>val1,key2=>val2,...} 92 if nk.key?(@nid) 93 nk[@nid].each{|kh| 94 key,hname = kh.split("\e") 95 hname ||= @defhash 96 if @storages.key?(hname) 97 vn, t, clk, expt, val = @storages[hname].get_raw(kvn[key], key, 0) 98 @stats.read_count += 1 99 res[key] = [clk, val] if val && Time.now.to_i <= expt 100 end 101 } 102 nk.delete(@nid) 103 end 104 105 nk.each_pair{|nid,keys| 106 res.merge!(forward_gets(nid,keys)) 107 } 108 109 res.each_pair{|key,cv| 110 clk, val = cv 111 send_data("VALUE #{key} 0 #{val.length} #{clk}\r\n#{val}\r\n") 112 } 113 send_data("END\r\n") 114 end
incr <key> <value> [noreply]rn
# File lib/roma/plugin/plugin_storage.rb 312 def ev_incr(s); incr_decr(:incr,s); end
“prepend” means that “prepend a new data to the previous one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 266 def ev_prepend(s); set(:prepend,s); end
rdelete <key> <clock>
# File lib/roma/plugin/plugin_storage.rb 229 def ev_rdelete(s) 230 key,hname = s[1].split("\e") 231 hname ||= @defhash 232 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 233 vn = @rttable.get_vnode_id(d) 234 unless @storages.key?(hname) 235 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 236 return 237 end 238 if @storages[hname].rdelete(vn, key, d, s[2].to_i) 239 send_data("DELETED\r\n") 240 else 241 send_data("NOT_FOUND\r\n") 242 end 243 end
“replace” means that “replace the previous data with a new one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 254 def ev_replace(s); set(:replace,s); end
rset_size_of_zredundant <n>
# File lib/roma/plugin/plugin_storage.rb 472 def ev_rset_size_of_zredundant(s) 473 if s.length != 2 || s[1].to_i == 0 474 return send_data("usage:set_set_size_of_zredundant <n>\r\n") 475 end 476 @stats.size_of_zredundant = s[1].to_i 477 send_data("STORED\r\n") 478 end
“set” means “store this data”. <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn
# File lib/roma/plugin/plugin_storage.rb 14 def ev_set(s); set(:set,s); end
set_expt <key> <expt>
# File lib/roma/plugin/plugin_storage.rb 320 def ev_set_expt(s) 321 key,hname = s[1].split("\e") 322 hname ||= @defhash 323 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 324 vn = @rttable.get_vnode_id(d) 325 nodes = @rttable.search_nodes_for_write(vn) 326 if nodes[0] != @nid 327 @log.warn("forward set_expt key=#{key} vn=#{vn} to #{nodes[0]}") 328 res = send_cmd(nodes[0],"fset_expt #{s[1]} #{s[2]}\r\n") 329 if res 330 return send_data("#{res}\r\n") 331 end 332 return send_data("SERVER_ERROR Message forward failed.\r\n") 333 end 334 335 unless @storages.key?(hname) 336 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 337 return 338 end 339 340 if @stats.wb_command_map.key?(:set_expt__prev) 341 @log.debug(":set_export__prev") 342 # [vn, t, clk, expt, val] 343 data = @storages[hname].get_raw(vn, key, d) 344 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt__prev], key, data[3].to_s) if data 345 end 346 347 expt = chg_time_expt(s[2].to_i) 348 ret = @storages[hname].set_expt(vn, key, d, expt) 349 350 if ret 351 if @stats.wb_command_map.key?(:set_expt) 352 @log.debug(":set_export") 353 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt], key, expt.to_s) 354 end 355 redundant(nodes[1..-1], hname, key, d, ret[2], ret[3], ret[4]) 356 if $roma.cr_writer.run_replication 357 fnc = 'set_expt' 358 Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]} #{expt}\r\n", s[1], expt) 359 end 360 send_data("STORED\r\n") 361 else 362 return send_data("NOT_STORED\r\n") 363 end 364 end
set_size_of_zredundant <n>
# File lib/roma/plugin/plugin_storage.rb 461 def ev_set_size_of_zredundant(s) 462 if s.length != 2 || s[1].to_i == 0 463 return send_data("usage:set_set_size_of_zredundant <n>\r\n") 464 end 465 res = broadcast_cmd("rset_size_of_zredundant #{s[1]}\r\n") 466 @stats.size_of_zredundant = s[1].to_i 467 res[@stats.ap_str] = "STORED" 468 send_data("#{res}\r\n") 469 end
Private Instance Methods
# File lib/roma/plugin/plugin_storage.rb 736 def fincr_fdecr(fnc,s) 737 key,hname = s[1].split("\e") 738 hname ||= @defhash 739 d = s[2].to_i 740 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0 741 v = s[3].to_i 742 vn = @rttable.get_vnode_id(d) 743 nodes = @rttable.search_nodes_for_write(vn) 744 if nodes.include?(@nid) == false 745 @log.debug("f#{fnc} failed key = #{s[1]} vn = #{vn}") 746 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 747 end 748 749 nodes.delete(@nid) 750 store_incr_decr(fnc, hname, vn, key, d, v, nodes) 751 end
# File lib/roma/plugin/plugin_storage.rb 482 def forward_get(nid, k, d) 483 con = get_connection(nid) 484 con.send("fget #{k}\r\n") 485 res = con.gets 486 if res == nil 487 @rttable.proc_failed(nid) 488 @log.error("forward get failed:nid=#{nid} key=#{k}") 489 return nil 490 elsif res == "END\r\n" 491 # value does not found 492 elsif res.start_with?("ERROR") 493 @rttable.proc_succeed(nid) 494 con.close_connection 495 return nil 496 else 497 s = res.split(/ /) 498 res << con.read_bytes(s[3].to_i + 2) 499 res << con.gets 500 end 501 return_connection(nid, con) 502 @rttable.proc_succeed(nid) 503 res 504 rescue => e 505 @rttable.proc_failed(nid) if e.message != "no connection" 506 @log.error("#{e.inspect}/#{$@}") 507 @log.error("forward get failed:nid=#{nid} key=#{k}") 508 nil 509 end
# File lib/roma/plugin/plugin_storage.rb 529 def forward_get_expt(nid, vn, key, is_unix=nil) 530 con = get_connection(nid) 531 con.send("get_expt #{key} #{is_unix}\r\n") 532 res = '' 533 while((line = con.gets)!="END\r\n") 534 res = line.chomp 535 end 536 return_connection(nid, con) 537 @rttable.proc_succeed(nid) 538 res 539 rescue => e 540 @rttable.proc_failed(nid) 541 @log.error("forward get_expt failed:nid=#{nid} key=#{key}") 542 nil 543 end
# File lib/roma/plugin/plugin_storage.rb 511 def forward_gets(nid, keys) 512 con = get_connection(nid) 513 con.send("gets #{keys.join(' ')}\r\n") 514 res = {} 515 while((line = con.gets)!="END\r\n") 516 s = line.chomp.split(/ /) 517 res[s[1]] = [s[4], con.read_bytes(s[3].to_i)] 518 con.read_bytes(2) 519 end 520 return_connection(nid, con) 521 @rttable.proc_succeed(nid) 522 res 523 rescue => e 524 @rttable.proc_failed(nid) 525 @log.error("forward gets failed:nid=#{nid} key=#{keys}") 526 nil 527 end
# File lib/roma/plugin/plugin_storage.rb 670 def fset(fnc,s) 671 key,hname = s[1].split("\e") 672 hname ||= @defhash 673 d = s[2].to_i 674 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0 675 v = read_bytes(s[4].to_i) 676 read_bytes(2) 677 vn = @rttable.get_vnode_id(d) 678 nodes = @rttable.search_nodes_for_write(vn) 679 if nodes.include?(@nid) == false 680 @log.error("f#{fnc} failed key = #{s[1]} vn = #{vn}") 681 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 682 end 683 684 nodes.delete(@nid) 685 store(fnc, hname, vn, key, d, s[3].to_i, v, nodes) 686 end
# File lib/roma/plugin/plugin_storage.rb 717 def incr_decr(fnc,s) 718 key,hname = s[1].split("\e") 719 hname ||= @defhash 720 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 721 v = s[2].to_i 722 vn = @rttable.get_vnode_id(d) 723 nodes = @rttable.search_nodes_for_write(vn) 724 if nodes[0] != @nid 725 @log.debug("forward #{fnc} key=#{s[1]} vn=#{vn} to #{nodes[0]}") 726 res = send_cmd(nodes[0],"f#{fnc} #{s[1]} #{d} #{s[2]}\r\n") 727 if res == nil || res.start_with?("ERROR") 728 return send_data("SERVER_ERROR Message forward failed.\r\n") 729 end 730 return send_data("#{res}\r\n") 731 end 732 733 store_incr_decr(fnc, hname, vn, key, d, v, nodes[1..-1]) 734 end
# File lib/roma/plugin/plugin_storage.rb 613 def redundant(nodes, hname, k, d, clk, expt, v) 614 if @stats.size_of_zredundant > 0 && @stats.size_of_zredundant < v.length 615 return zredundant(nodes, hname, k, d, clk, expt, v) 616 end 617 618 nodes.each{ |nid| 619 res = send_cmd(nid,"rset #{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}\r\n#{v}\r\n") 620 if res == nil || res.start_with?("ERROR") 621 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('redundant',[nid,hname,k,d,clk,expt,v])) 622 @log.warn("redundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}") 623 end 624 } 625 end
# File lib/roma/plugin/plugin_storage.rb 639 def set(fnc,s) 640 if s.length != 5 641 @log.error("set:wrong number of arguments(#{s})") 642 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 643 end 644 645 bytes = s[4].to_i 646 if bytes < 0 647 @log.error("set:wrong value size(#{s})") 648 return send_data("CLIENT_ERROR Wrong value size.\r\n") 649 end 650 651 key,hname = s[1].split("\e") 652 hname ||= @defhash 653 d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits 654 v = read_bytes(bytes) 655 read_bytes(2) 656 vn = @rttable.get_vnode_id(d) 657 nodes = @rttable.search_nodes_for_write(vn) 658 if nodes[0] != @nid 659 @log.warn("forward #{fnc} key=#{key} vn=#{vn} to #{nodes[0]}") 660 res = send_cmd(nodes[0],"f#{fnc} #{s[1]} #{d} #{s[3]} #{s[4]}\r\n#{v}\r\n") 661 if res == nil || res.start_with?("ERROR") 662 return send_data("SERVER_ERROR Message forward failed.\r\n") 663 end 664 return send_data("#{res}\r\n") 665 end 666 667 store(fnc, hname, vn, key, d, s[3].to_i, v, nodes[1..-1]) 668 end
# File lib/roma/plugin/plugin_storage.rb 545 def store(fnc, hname, vn, k, d, expt, v, nodes) 546 expt = chg_time_expt(expt) 547 unless @storages.key?(hname) 548 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 549 return 550 end 551 552 if @stats.wb_command_map.key?("#{fnc.to_s}__prev".to_sym) 553 data = @storages[hname].get(vn, k, d) 554 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map["#{fnc.to_s}__prev".to_sym], k, data) if data 555 end 556 557 ret = @storages[hname].send(fnc, vn, k, d, expt ,v) 558 @stats.write_count += 1 559 560 if ret 561 if @stats.wb_command_map.key?(fnc) 562 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[fnc], k, ret[4]) 563 end 564 redundant(nodes, hname, k, d, ret[2], expt, ret[4]) 565 if $roma.cr_writer.run_replication 566 k = "#{k}\e#{hname}" if hname != @defhash 567 Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} 1 #{expt} #{v.length} \r\n#{v}\r\n", k, v) 568 end 569 send_data("STORED\r\n") 570 else 571 @log.error("#{fnc} NOT_STORED:#{hname} #{vn} #{k} #{d} #{expt}") 572 send_data("NOT_STORED\r\n") 573 end 574 end
# File lib/roma/plugin/plugin_storage.rb 576 def store_cas(hname, vn, k, d, clk, expt, v, nodes) 577 expt = chg_time_expt(expt) 578 unless @storages.key?(hname) 579 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 580 return 581 end 582 583 if @stats.wb_command_map.key?(:cas__prev) 584 data = @storages[hname].get(vn, k, d) 585 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:cas__prev], k, data) if data 586 end 587 588 ret = @storages[hname].cas(vn, k, d, clk, expt ,v) 589 @stats.write_count += 1 590 591 case ret 592 when nil 593 @log.error("cas NOT_STORED:#{hname} #{vn} #{k} #{d} #{expt} #{clk}") 594 send_data("NOT_STORED\r\n") 595 when :not_found 596 send_data("NOT_FOUND\r\n") 597 when :exists 598 send_data("EXISTS\r\n") 599 else 600 if @stats.wb_command_map.key?(:cas) 601 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:cas], k, ret[4]) 602 end 603 if $roma.cr_writer.run_replication 604 k = "#{k}\e#{hname}" if hname != @defhash 605 fnc = 'set' # To restrain a defference between main and replica cluster due to clk 606 Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} 0 #{expt} #{v.length} \r\n#{v}\r\n", k, v) 607 end 608 redundant(nodes, hname, k, d, ret[2], expt, ret[4]) 609 send_data("STORED\r\n") 610 end 611 end
# File lib/roma/plugin/plugin_storage.rb 688 def store_incr_decr(fnc, hname, vn, k, d, v, nodes) 689 unless @storages.key?(hname) 690 send_data("SERVER_ERROR #{hname} does not exists.\r\n") 691 return 692 end 693 694 if @stats.wb_command_map.key?("#{fnc.to_s}__prev".to_sym) 695 data = @storages[hname].get(vn, k, d) 696 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map["#{fnc.to_s}__prev".to_sym], k, data) if data 697 end 698 699 res = @storages[hname].send(fnc, vn, k, d, v) 700 @stats.write_count += 1 701 702 if res 703 if @stats.wb_command_map.key?(fnc) 704 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[fnc], k, res[4]) 705 end 706 if $roma.cr_writer.run_replication 707 k = "#{k}\e#{hname}" if hname != @defhash 708 Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} #{v}\r\n", k, v) 709 end 710 redundant(nodes, hname, k, d, res[2], res[3], res[4]) 711 send_data("#{res[4]}\r\n") 712 else 713 send_data("NOT_FOUND\r\n") 714 end 715 end
# File lib/roma/plugin/plugin_storage.rb 627 def zredundant(nodes, hname, k, d, clk, expt, v) 628 zv = Zlib::Deflate.deflate(v) 629 630 nodes.each{ |nid| 631 res = send_cmd(nid,"rzset #{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}\r\n#{zv}\r\n") 632 if res == nil || res.start_with?("ERROR") 633 Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('zredundant',[nid,hname,k,d,clk,expt,zv])) 634 @log.warn("zredundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length} -> #{nid}") 635 end 636 } 637 end