module Roma::CommandPlugin::PluginAshiatoList
Public Instance Methods
alist_at <key> <index> [forward]rn
( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 18 def ev_alist_at(s) 19 hname, k, d, vn, nodes = calc_hash(s[1]) 20 return forward1(nodes[0], s) if nodes[0] != @nid 21 22 ddata = @storages[hname].get(vn, k, d) 23 @stats.read_count += 1 24 if ddata 25 v = Marshal.load(ddata)[0] 26 return send_data("END\r\n") if v.length <= s[2].to_i 27 ret = v.at(s[2].to_i) 28 ret = "" unless ret 29 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 30 else 31 return send_data("END\r\n") 32 end 33 rescue => e 34 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 35 send_data("#{msg}\r\n") 36 @log.error("#{e} #{$@}") 37 end
alist_clear <key> [forward]rn
(CLEARED|NOT_CLEARED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 42 def ev_alist_clear(s) 43 hname, k, d, vn, nodes = calc_hash(s[1]) 44 return forward2(nodes[0], s) if nodes[0] != @nid 45 46 ddata = @storages[hname].get(vn, k, d) 47 return send_data("NOT_FOUND\r\n") unless ddata 48 49 expt = 0x7fffffff 50 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump([[],[]])) 51 @stats.delete_count += 1 52 53 54 if ret 55 if @stats.wb_command_map.key?(:alist_clear) 56 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_clear], k, ddata) 57 end 58 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 59 send_data("CLEARED\r\n") 60 else 61 send_data("NOT_CLEARED\r\n") 62 end 63 rescue => e 64 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 65 send_data("#{msg}\r\n") 66 @log.error("#{e} #{$@}") 67 end
alist_delete <key> <bytes> [forward]rn <data block>rn
(DELETED|NOT_DELETED|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 73 def ev_alist_delete(s) 74 hname, k, d, vn, nodes = calc_hash(s[1]) 75 data = read_bytes(s[2].to_i) 76 read_bytes(2) 77 return forward2(nodes[0], s, data) if nodes[0] != @nid 78 79 ddata = @storages[hname].get(vn, k, d) 80 return send_data("NOT_FOUND\r\n") unless ddata 81 82 v = Marshal.load(ddata) 83 return send_data("NOT_DELETED\r\n") unless v[0].include?(data) 84 while(idx = v[0].index(data)) 85 v[0].delete_at(idx) 86 v[1].delete_at(idx) 87 end 88 89 expt = 0x7fffffff 90 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 91 @stats.delete_count += 1 92 93 if ret 94 if @stats.wb_command_map.key?(:alist_delete) 95 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_delete], k, data) 96 end 97 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 98 send_data("DELETED\r\n") 99 else 100 send_data("NOT_DELETED\r\n") 101 end 102 rescue => e 103 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 104 send_data("#{msg}\r\n") 105 @log.error("#{e} #{$@}") 106 end
# File lib/roma/plugin/plugin_alist.rb 424 def ev_alist_delete_and_prepend(s); ev_alist_swap_and_insert(s); end
alist_delete_at <key> <index> [forward]rn
(DELETED|NOT_DELETED|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 111 def ev_alist_delete_at(s) 112 hname, k, d, vn, nodes = calc_hash(s[1]) 113 return forward2(nodes[0], s) if nodes[0] != @nid 114 115 ddata = @storages[hname].get(vn, k, d) 116 return send_data("NOT_FOUND\r\n") unless ddata 117 118 v = Marshal.load(ddata) 119 dret = v[0].delete_at(s[2].to_i) 120 return send_data("NOT_DELETED\r\n") unless dret 121 v[1].delete_at(s[2].to_i) 122 123 expt = 0x7fffffff 124 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 125 @stats.delete_count += 1 126 127 if ret 128 if @stats.wb_command_map.key?(:alist_delete_at) 129 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_delete_at], k, dret) 130 end 131 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 132 send_data("DELETED\r\n") 133 else 134 send_data("NOT_DELETED\r\n") 135 end 136 rescue => e 137 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 138 send_data("#{msg}\r\n") 139 @log.error("${e} #{$@}") 140 end
alist_empty? <key> [forward]rn
(true|false|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 145 def ev_alist_empty?(s) 146 hname, k, d, vn, nodes = calc_hash(s[1]) 147 return forward2(nodes[0], s) if nodes[0] != @nid 148 149 ddata = @storages[hname].get(vn, k, d) 150 @stats.read_count += 1 151 152 return send_data("NOT_FOUND\r\n") unless ddata 153 154 v = Marshal.load(ddata) 155 ret = v[0].empty? 156 157 send_data("#{ret}\r\n") 158 rescue => e 159 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 160 send_data("#{msg}\r\n") 161 @log.error("#{e} #{$@}") 162 end
alist_expired_swap_and_insert <key> <expire-time> <bytes> [forward]rn <data block>rn
the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time
suffix is hour. 'd' as expire-time
suffix is day.
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 528 def ev_alist_expired_swap_and_insert(s) 529 hname, k, d, vn, nodes = calc_hash(s[1]) 530 data = read_bytes(s[3].to_i) 531 read_bytes(2) 532 return forward2(nodes[0], s, data) if nodes[0] != @nid 533 534 et = expired_str_to_i(s[2]) 535 return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et 536 537 v = to_alist_value_for_write(hname, vn, k, d) 538 unless v 539 return send_data("SERVER_ERROR data other than alist's format already exist.\r\n") 540 end 541 542 # @log.debug("#{s[2]} et=#{et}") 543 v = expired_swap(v, data, et) 544 545 v[0].insert(0,data) 546 v[1].insert(0,Time.now.to_i) 547 548 expt = 0x7fffffff 549 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 550 @stats.write_count += 1 551 552 if ret 553 if @stats.wb_command_map.key?(:alist_expired_swap_and_insert) 554 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_insert], k, data) 555 end 556 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 557 send_data("STORED\r\n") 558 else 559 send_data("NOT_STORED\r\n") 560 end 561 rescue => e 562 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 563 send_data("#{msg}\r\n") 564 @log.error("#{e} #{$@}") 565 end
alist_expired_swap_and_push <key> <expire-time> <bytes> [forward]rn <data block>rn
the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time
suffix is hour. 'd' as expire-time
suffix is day.
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 1030 def ev_alist_expired_swap_and_push(s) 1031 hname, k, d, vn, nodes = calc_hash(s[1]) 1032 data = read_bytes(s[3].to_i) 1033 read_bytes(2) 1034 return forward2(nodes[0], s, data) if nodes[0] != @nid 1035 1036 et = expired_str_to_i(s[2]) 1037 return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et 1038 1039 v = to_alist_value_for_write(hname, vn, k, d) 1040 unless v 1041 return send_data("SERVER_ERROR data other than alist's format already exist.\r\n") 1042 end 1043 1044 # @log.debug("#{s[2]} et=#{et}") 1045 v = expired_swap(v, data, et) 1046 1047 v[0].push(data) 1048 v[1].push(Time.now.to_i) 1049 1050 expt = 0x7fffffff 1051 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 1052 @stats.write_count += 1 1053 1054 if ret 1055 if @stats.wb_command_map.key?(:alist_expired_swap_and_push) 1056 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_push], k, data) 1057 end 1058 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 1059 send_data("STORED\r\n") 1060 else 1061 send_data("NOT_STORED\r\n") 1062 end 1063 rescue => e 1064 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1065 send_data("#{msg}\r\n") 1066 @log.error("#{e} #{$@}") 1067 end
alist_expired_swap_and_sized_insert <key> <expire-time> <array-size> <bytes> [forward]rn <data block>rn
the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time
suffix is hour. 'd' as expire-time
suffix is day.
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 577 def ev_alist_expired_swap_and_sized_insert(s) 578 hname, k, d, vn, nodes = calc_hash(s[1]) 579 data = read_bytes(s[4].to_i) 580 read_bytes(2) 581 return forward2(nodes[0], s, data) if nodes[0] != @nid 582 583 et = expired_str_to_i(s[2]) 584 return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et 585 586 v = to_alist_value_for_write(hname, vn, k, d) 587 unless v 588 return send_data("SERVER_ERROR data other than alist's format already exist.\r\n") 589 end 590 591 # @log.debug("#{s[2]} et=#{et}") 592 v = expired_swap(v, data, et) 593 594 v[0].insert(0,data) 595 v[0] = v[0][0..(s[3].to_i - 1)] 596 v[1].insert(0,Time.now.to_i) 597 v[1] = v[1][0..(s[3].to_i - 1)] 598 599 expt = 0x7fffffff 600 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 601 @stats.write_count += 1 602 603 if ret 604 if @stats.wb_command_map.key?(:alist_expired_swap_and_sized_insert) 605 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_sized_insert], k, data) 606 end 607 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 608 send_data("STORED\r\n") 609 else 610 send_data("NOT_STORED\r\n") 611 end 612 rescue => e 613 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 614 send_data("#{msg}\r\n") 615 @log.error("#{e} #{$@}") 616 end
alist_expired_swap_and_sized_push <key> <expire-time> <array-size> <bytes> [forward]rn <data block>rn
the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time
suffix is hour. 'd' as expire-time
suffix is day.
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 1079 def ev_alist_expired_swap_and_sized_push(s) 1080 hname, k, d, vn, nodes = calc_hash(s[1]) 1081 data = read_bytes(s[4].to_i) 1082 read_bytes(2) 1083 return forward2(nodes[0], s, data) if nodes[0] != @nid 1084 1085 et = expired_str_to_i(s[2]) 1086 return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et 1087 1088 v = to_alist_value_for_write(hname, vn, k, d) 1089 unless v 1090 return send_data("SERVER_ERROR data other than alist's format already exist.\r\n") 1091 end 1092 1093 # @log.debug("#{s[2]} et=#{et}") 1094 v = expired_swap(v, data, et) 1095 1096 max = s[3].to_i 1097 return send_data("NOT_PUSHED\r\n") if v[0].length >= max 1098 1099 v[0].push(data) 1100 v[0] = v[0][0..(max - 1)] 1101 v[1].push(Time.now.to_i) 1102 v[1] = v[1][0..(max - 1)] 1103 1104 expt = 0x7fffffff 1105 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 1106 @stats.write_count += 1 1107 1108 if ret 1109 if @stats.wb_command_map.key?(:alist_expired_swap_and_sized_push) 1110 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_sized_push], k, data) 1111 end 1112 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 1113 send_data("STORED\r\n") 1114 else 1115 send_data("NOT_STORED\r\n") 1116 end 1117 rescue => e 1118 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1119 send_data("#{msg}\r\n") 1120 @log.error("#{e} #{$@}") 1121 end
alist_first <key> [forward]rn
( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 171 def ev_alist_first(s) 172 hname, k, d, vn, nodes = calc_hash(s[1]) 173 return forward1(nodes[0], s) if nodes[0] != @nid 174 175 ddata = @storages[hname].get(vn, k, d) 176 @stats.read_count += 1 177 178 if ddata 179 v = Marshal.load(ddata)[0] 180 return send_data("END\r\n") if v.length == 0 181 ret = v.first 182 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 183 else 184 return send_data("END\r\n") 185 end 186 rescue => e 187 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 188 send_data("#{msg}\r\n") 189 @log.error("#{e} #{$@}") 190 end
alist_gets <key> [index|range] [forward]rn
( [VALUE <key> 0 <length of length string>rn <length string>rn (VALUE <key> 0 <value length>rn <value>rn)* ] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 202 def ev_alist_gets(s) 203 hname, k, d, vn, nodes = calc_hash(s[1]) 204 return forward1(nodes[0], s) if nodes[0] != @nid 205 206 ddata = @storages[hname].get(vn, k, 0) 207 @stats.read_count += 1 208 209 if ddata 210 v = Marshal.load(ddata)[0] 211 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2] 212 if $1 213 if v.length <= $1.to_i 214 return send_data("END\r\n") 215 end 216 buf = v[Range.new($1.to_i,$1.to_i)] 217 else 218 buf = v[Range.new($2.to_i,$3.to_i)] 219 end 220 else 221 buf = v 222 end 223 len = v.length 224 send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n") 225 buf.each{|val| 226 send_data("VALUE #{s[1]} 0 #{val.length}\r\n#{val}\r\n") 227 } 228 return send_data("END\r\n") 229 else 230 return send_data("END\r\n") 231 end 232 rescue => e 233 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 234 send_data("#{msg}\r\n") 235 @log.error("#{e} #{$@}") 236 end
alist_gets_with_time <key> [index|range] [forward]rn
( [VALUE <key> 0 <length of length string>rn <length string>rn (VALUE <key> 0 <value length>rn <value string>rn VALUE <key> 0 <value length>rn <time string>rn)* ] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 250 def ev_alist_gets_with_time(s) 251 hname, k, d, vn, nodes = calc_hash(s[1]) 252 return forward1(nodes[0], s) if nodes[0] != @nid 253 254 ddata = @storages[hname].get(vn, k, 0) 255 @stats.read_count += 1 256 257 if ddata 258 v = Marshal.load(ddata) 259 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2] 260 if $1 261 if v[0].length <= $1.to_i 262 return send_data("END\r\n") 263 end 264 v_buf = v[0][Range.new($1.to_i,$1.to_i)] 265 t_buf = v[1][Range.new($1.to_i,$1.to_i)] 266 else 267 v_buf = v[0][Range.new($2.to_i,$3.to_i)] 268 t_buf = v[1][Range.new($2.to_i,$3.to_i)] 269 end 270 else 271 v_buf = v[0] 272 t_buf = v[1] 273 end 274 len = v[0].length 275 send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n") 276 v_buf.each_with_index{|val,idx| 277 send_data("VALUE #{s[1]} 0 #{val.length}\r\n#{val}\r\n") 278 send_data("VALUE #{s[1]} 0 #{t_buf[idx].to_s.length}\r\n#{t_buf[idx]}\r\n") 279 } 280 return send_data("END\r\n") 281 else 282 return send_data("END\r\n") 283 end 284 rescue => e 285 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 286 send_data("#{msg}\r\n") 287 @log.error("#{e} #{$@}") 288 end
alist_include? <key> <bytes> [forward]rn <data block>rn
(true|false|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 295 def ev_alist_include?(s) 296 hname, k, d, vn, nodes = calc_hash(s[1]) 297 data = read_bytes(s[2].to_i) 298 read_bytes(2) 299 return forward2(nodes[0], s, data) if nodes[0] != @nid 300 301 ddata = @storages[hname].get(vn, k, d) 302 @stats.read_count += 1 303 304 return send_data("NOT_FOUND\r\n") unless ddata 305 306 v = Marshal.load(ddata)[0] 307 ret = v.include?(data) 308 309 send_data("#{ret}\r\n") 310 rescue => e 311 send_data("SERVER_ERROR #{e} #{$@}\r\n") 312 @log.error("#{e} #{$@}") 313 end
alist_index <key> <bytes> [forward]rn <data block>rn
(<index>|nil|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 319 def ev_alist_index(s) 320 hname, k, d, vn, nodes = calc_hash(s[1]) 321 data = read_bytes(s[2].to_i) 322 read_bytes(2) 323 return forward2(nodes[0], s, data) if nodes[0] != @nid 324 325 ddata = @storages[hname].get(vn, k, d) 326 @stats.read_count += 1 327 328 return send_data("NOT_FOUND\r\n") unless ddata 329 330 v = Marshal.load(ddata)[0] 331 ret = v.index(data) 332 if ret 333 send_data("#{ret}\r\n") 334 else 335 send_data("nil\r\n") 336 end 337 rescue => e 338 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 339 send_data("#{msg}\r\n") 340 @log.error("#{e} #{$@}") 341 end
alist_insert <key> <index> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 347 def ev_alist_insert(s) 348 hname, k, d, vn, nodes = calc_hash(s[1]) 349 data = read_bytes(s[3].to_i) 350 read_bytes(2) 351 return forward2(nodes[0], s, data) if nodes[0] != @nid 352 353 ddata = @storages[hname].get(vn, k, d) 354 if ddata 355 v = Marshal.load(ddata) 356 else 357 v = [[],[]] 358 end 359 360 v[0].insert(s[2].to_i,data) 361 v[1].insert(s[2].to_i,Time.now.to_i) 362 expt = 0x7fffffff 363 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 364 @stats.write_count += 1 365 366 if ret 367 if @stats.wb_command_map.key?(:alist_insert) 368 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_insert], k, data) 369 end 370 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 371 send_data("STORED\r\n") 372 else 373 send_data("NOT_STORED\r\n") 374 end 375 rescue => e 376 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 377 send_data("#{msg}\r\n") 378 @log.error("#{e} #{$@}") 379 end
alist_join <key> <bytes> [index|range] [forward]rn <separator block>rn
( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 681 def ev_alist_join(s) 682 hname, k, d, vn, nodes = calc_hash(s[1]) 683 data = read_bytes(s[2].to_i) 684 read_bytes(2) 685 return forward1(nodes[0], s, data) if nodes[0] != @nid 686 687 ddata = @storages[hname].get(vn, k, 0) 688 @stats.read_count += 1 689 690 if ddata 691 v = Marshal.load(ddata)[0] 692 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[3] 693 if $1 694 if v.length <= $1.to_i 695 return send_data("END\r\n") 696 end 697 buf = v[Range.new($1.to_i,$1.to_i)] 698 else 699 buf = v[Range.new($2.to_i,$3.to_i)] 700 end 701 else 702 buf = v 703 end 704 len = v.length 705 ret = buf.join(data) 706 send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n") 707 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 708 else 709 return send_data("END\r\n") 710 end 711 rescue => e 712 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 713 send_data("#{msg}\r\n") 714 @log.error("#{e} #{$@}") 715 end
alist_join_with_time <key> <bytes> [index|range] [forward]rn <separator block>rn
( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value string>rn VALUE <key> 0 <value length>rn <time string>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 631 def ev_alist_join_with_time(s) 632 hname, k, d, vn, nodes = calc_hash(s[1]) 633 data = read_bytes(s[2].to_i) 634 read_bytes(2) 635 return forward1(nodes[0], s, data) if nodes[0] != @nid 636 637 ddata = @storages[hname].get(vn, k, 0) 638 @stats.read_count += 1 639 if ddata 640 v = Marshal.load(ddata) 641 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[3] 642 if $1 643 if v[0].length <= $1.to_i 644 return send_data("END\r\n") 645 end 646 v_buf = v[0][Range.new($1.to_i,$1.to_i)] 647 t_buf = v[1][Range.new($1.to_i,$1.to_i)] 648 else 649 v_buf = v[0][Range.new($2.to_i,$3.to_i)] 650 t_buf = v[1][Range.new($2.to_i,$3.to_i)] 651 end 652 else 653 v_buf = v[0] 654 t_buf = v[1] 655 end 656 len = v[0].length 657 v_ret = v_buf.join(data) 658 t_ret = t_buf.join(data) 659 send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n") 660 send_data("VALUE #{s[1]} 0 #{v_ret.length}\r\n#{v_ret}\r\n") 661 return send_data("VALUE #{s[1]} 0 #{t_ret.length}\r\n#{t_ret}\r\nEND\r\n") 662 else 663 return send_data("END\r\n") 664 end 665 rescue => e 666 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 667 send_data("#{msg}\r\n") 668 @log.error("#{e} #{$@}") 669 end
alist_last <key> [forward]rn
( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 763 def ev_alist_last(s) 764 hname, k, d, vn, nodes = calc_hash(s[1]) 765 return forward1(nodes[0], s) if nodes[0] != @nid 766 767 ddata = @storages[hname].get(vn, k, d) 768 @stats.read_count += 1 769 770 if ddata 771 v = Marshal.load(ddata)[0] 772 return send_data("END\r\n") if v.length == 0 773 ret = v.last 774 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 775 else 776 return send_data("END\r\n") 777 end 778 rescue => e 779 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 780 send_data("#{msg}\r\n") 781 @log.error("#{e} #{$@}") 782 end
alist_length <key> [forward]rn
(<length>|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 787 def ev_alist_length(s) 788 hname, k, d, vn, nodes = calc_hash(s[1]) 789 return forward2(nodes[0], s) if nodes[0] != @nid 790 ddata = @storages[hname].get(vn, k, d) 791 @stats.read_count += 1 792 793 return send_data("NOT_FOUND\r\n") unless ddata 794 v = Marshal.load(ddata)[0] 795 ret = v.length 796 send_data("#{ret}\r\n") 797 rescue => e 798 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 799 send_data("#{msg}\r\n") 800 @log.error("#{e} #{$@}") 801 end
alist_pop <key> [forward]rn
( [VALUE <key> 0 <value length>rn <value>rn] END |NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 810 def ev_alist_pop(s) 811 hname, k, d, vn, nodes = calc_hash(s[1]) 812 return forward1(nodes[0], s) if nodes[0] != @nid 813 814 ddata = @storages[hname].get(vn, k, d) 815 if ddata 816 v = Marshal.load(ddata) 817 return send_data("END\r\n") if v[0].length ==0 818 else 819 return send_data("END\r\n") 820 end 821 822 retv = v[0].pop 823 v[1].pop 824 expt = 0x7fffffff 825 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 826 @stats.read_count += 1 827 @stats.write_count += 1 828 829 if ret 830 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 831 send_data("VALUE #{s[1]} 0 #{retv.length}\r\n#{retv}\r\nEND\r\n") 832 else 833 send_data("NOT_STORED\r\n") 834 end 835 rescue => e 836 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 837 send_data("#{msg}\r\n") 838 @log.error("#{e} #{$@}") 839 end
alist_push <key> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 845 def ev_alist_push(s) 846 hname, k, d, vn, nodes = calc_hash(s[1]) 847 data = read_bytes(s[2].to_i) 848 read_bytes(2) 849 if nodes[0] != @nid 850 return forward2(nodes[0], s, data) 851 end 852 853 ddata = @storages[hname].get(vn, k, d) 854 if ddata 855 v = Marshal.load(ddata) 856 else 857 v = [[],[]] 858 end 859 860 v[0].push(data) 861 v[1].push(Time.now.to_i) 862 expt = 0x7fffffff 863 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 864 @stats.write_count += 1 865 866 if ret 867 if @stats.wb_command_map.key?(:alist_push) 868 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_push], k, data) 869 end 870 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 871 send_data("STORED\r\n") 872 else 873 send_data("NOT_STORED\r\n") 874 end 875 rescue => e 876 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 877 send_data("#{msg}\r\n") 878 @log.error("#{e} #{$@}") 879 end
alist_shift <key> [forward]rn
( [VALUE <key> 0 <value length>rn <value>rn] END |NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 1169 def ev_alist_shift(s) 1170 hname, k, d, vn, nodes = calc_hash(s[1]) 1171 return forward1(nodes[0], s) if nodes[0] != @nid 1172 1173 ddata = @storages[hname].get(vn, k, d) 1174 if ddata 1175 v = Marshal.load(ddata) 1176 return send_data("END\r\n") if v[0].length ==0 1177 else 1178 return send_data("END\r\n") 1179 end 1180 1181 retv = v[0].shift 1182 v[1].shift 1183 expt = 0x7fffffff 1184 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 1185 @stats.read_count += 1 1186 @stats.write_count += 1 1187 1188 if ret 1189 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 1190 send_data("VALUE #{s[1]} 0 #{retv.length}\r\n#{retv}\r\nEND\r\n") 1191 else 1192 send_data("NOT_STORED\r\n") 1193 end 1194 rescue => e 1195 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1196 send_data("#{msg}\r\n") 1197 @log.error("#{e} #{$@}") 1198 end
# File lib/roma/plugin/plugin_alist.rb 470 def ev_alist_sized_delete_and_prepend(s); ev_alist_swap_and_sized_insert(s); end
alist_sized_insert <key> <array-size> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 387 def ev_alist_sized_insert(s) 388 hname, k, d, vn, nodes = calc_hash(s[1]) 389 data = read_bytes(s[3].to_i) 390 read_bytes(2) 391 return forward2(nodes[0], s, data) if nodes[0] != @nid 392 393 ddata = @storages[hname].get(vn, k, d) 394 if ddata 395 v = Marshal.load(ddata) 396 else 397 v = [[],[]] 398 end 399 400 v[0].insert(0,data) 401 v[0] = v[0][0..(s[2].to_i - 1)] 402 v[1].insert(0,Time.now.to_i) 403 v[1] = v[1][0..(s[2].to_i - 1)] 404 405 expt = 0x7fffffff 406 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 407 @stats.write_count += 1 408 409 if ret 410 if @stats.wb_command_map.key?(:alist_sized_insert) 411 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_sized_insert], k, data) 412 end 413 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 414 send_data("STORED\r\n") 415 else 416 send_data("NOT_STORED\r\n") 417 end 418 rescue => e 419 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 420 send_data("#{msg}\r\n") 421 @log.error("#{e} #{$@}") 422 end
# File lib/roma/plugin/plugin_alist.rb 381 def ev_alist_sized_prepend(s); ev_alist_sized_insert(s); end
alist_sized_push <key> <array-size> <bytes> [forward]rn <data block>rn
(STORED|NOT_PUSHED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 885 def ev_alist_sized_push(s) 886 hname, k, d, vn, nodes = calc_hash(s[1]) 887 data = read_bytes(s[3].to_i) 888 read_bytes(2) 889 return forward2(nodes[0], s, data) if nodes[0] != @nid 890 891 ddata = @storages[hname].get(vn, k, d) 892 if ddata 893 v = Marshal.load(ddata) 894 else 895 v = [[],[]] 896 end 897 898 max = s[2].to_i 899 return send_data("NOT_PUSHED\r\n") if v[0].length >= max 900 901 v[0].push(data) 902 v[0] = v[0][0..(max - 1)] 903 v[1].push(Time.now.to_i) 904 v[1] = v[1][0..(max - 1)] 905 906 expt = 0x7fffffff 907 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 908 @stats.write_count += 1 909 910 911 if ret 912 if @stats.wb_command_map.key?(:alist_sized_push) 913 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_sized_push], k, data) 914 end 915 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 916 send_data("STORED\r\n") 917 else 918 send_data("NOT_STORED\r\n") 919 end 920 rescue => e 921 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 922 send_data("#{msg}\r\n") 923 @log.error("#{e} #{$@}") 924 end
alist_spushv <hash-name> <vnode-id> src dst
| ['alist_spushv' <hname> <vn>\r\n]->| |<-['READY'\r\n] | | [<dumpdata>]->| | : | | : | | [<end of dump>]->| |<-['STORED'\r\n] |
# File lib/roma/plugin/plugin_alist.rb 1251 def ev_alist_spushv(s) 1252 if s.length != 3 1253 @log.error("#{__method__}:wrong number of arguments(#{s})") 1254 return send_data("CLIENT_ERROR Wrong number of arguments.\r\n") 1255 end 1256 if @stats.spushv_protection 1257 @log.info("#{__method__}:In spushv_protection") 1258 return send_data("SERVER_ERROR In spushv_protection.\r\n") 1259 end 1260 @stats.run_receive_a_vnode["#{s[1]}_#{s[2]}"] = true 1261 1262 $roma.stop_clean_up 1263 1264 send_data("READY\r\n") 1265 1266 count = 0 1267 loop { 1268 context_bin = read_bytes(20, 100) 1269 vn, last, clk, expt, klen = context_bin.unpack('NNNNN') 1270 break if klen == 0 # end of dump ? 1271 k = read_bytes(klen) 1272 vlen_bin = read_bytes(4, 100) 1273 vlen, = vlen_bin.unpack('N') 1274 v = read_bytes(vlen, 100) 1275 val = to_alist_value(v) 1276 if val 1277 # @log.debug("listdata #{vn} #{k} #{val.inspect}") 1278 count += 1 if merge_list(s[1], vn, last, clk, expt, k, v, val) 1279 else 1280 # @log.debug("not listdata #{vn} #{k} #{val}") 1281 count += 1 if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, v) 1282 end 1283 } 1284 if @stats.spushv_protection 1285 @log.info("#{__method__}:Canceled because of spushv_protection") 1286 send_data("CANCELED\r\n") 1287 else 1288 send_data("STORED\r\n") 1289 end 1290 @log.debug("alist #{count} keys loaded.") 1291 rescue Storage::StorageException => e 1292 @log.error("#{e.inspect} #{$@}") 1293 close_connection 1294 if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) && 1295 Config::STORAGE_EXCEPTION_ACTION == :shutdown 1296 @log.error("#{__method__}:Romad will stop") 1297 @stop_event_loop = true 1298 end 1299 rescue => e 1300 @log.error("#{e}\n#{$@}") 1301 ensure 1302 @stats.run_receive_a_vnode.delete("#{s[1]}_#{s[2]}") if s.length == 3 1303 end
alist_swap_and_insert <key> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 430 def ev_alist_swap_and_insert(s) 431 hname, k, d, vn, nodes = calc_hash(s[1]) 432 data = read_bytes(s[2].to_i) 433 read_bytes(2) 434 return forward2(nodes[0], s, data) if nodes[0] != @nid 435 436 ddata = @storages[hname].get(vn, k, d) 437 if ddata 438 v = Marshal.load(ddata) 439 else 440 v = [[],[]] 441 end 442 443 idx = v[0].index(data) 444 if idx 445 v[0].delete_at(idx) 446 v[1].delete_at(idx) 447 end 448 v[0].insert(0,data) 449 v[1].insert(0,Time.now.to_i) 450 451 expt = 0x7fffffff 452 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 453 @stats.write_count += 1 454 455 if ret 456 if @stats.wb_command_map.key?(:alist_swap_and_insert) 457 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_insert], k, data) 458 end 459 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 460 send_data("STORED\r\n") 461 else 462 send_data("NOT_STORED\r\n") 463 end 464 rescue => e 465 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 466 send_data("#{msg}\r\n") 467 @log.error("#{e} #{$@}") 468 end
alist_swap_and_push <key> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 930 def ev_alist_swap_and_push(s) 931 hname, k, d, vn, nodes = calc_hash(s[1]) 932 data = read_bytes(s[2].to_i) 933 read_bytes(2) 934 return forward2(nodes[0], s, data) if nodes[0] != @nid 935 936 ddata = @storages[hname].get(vn, k, d) 937 if ddata 938 v = Marshal.load(ddata) 939 else 940 v = [[],[]] 941 end 942 943 idx = v[0].index(data) 944 if idx 945 v[0].delete_at(idx) 946 v[1].delete_at(idx) 947 end 948 v[0].push(data) 949 v[1].push(Time.now.to_i) 950 951 expt = 0x7fffffff 952 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 953 @stats.write_count += 1 954 955 if ret 956 if @stats.wb_command_map.key?(:alist_swap_and_push) 957 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_push], k, data) 958 end 959 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 960 send_data("STORED\r\n") 961 else 962 send_data("NOT_STORED\r\n") 963 end 964 rescue => e 965 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 966 send_data("#{msg}\r\n") 967 @log.error("#{e} #{$@}") 968 end
alist_swap_and_sized_insert <key> <array-size> <bytes> [forward]rn <data block>rn
(STORED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 476 def ev_alist_swap_and_sized_insert(s) 477 hname, k, d, vn, nodes = calc_hash(s[1]) 478 data = read_bytes(s[3].to_i) 479 read_bytes(2) 480 return forward2(nodes[0], s, data) if nodes[0] != @nid 481 482 ddata = @storages[hname].get(vn, k, d) 483 if ddata 484 v = Marshal.load(ddata) 485 else 486 v = [[],[]] 487 end 488 489 idx = v[0].index(data) 490 if idx 491 v[0].delete_at(idx) 492 v[1].delete_at(idx) 493 end 494 v[0].insert(0,data) 495 v[1].insert(0,Time.now.to_i) 496 v[0] = v[0][0..(s[2].to_i - 1)] 497 v[1] = v[1][0..(s[2].to_i - 1)] 498 499 expt = 0x7fffffff 500 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 501 @stats.write_count += 1 502 503 if ret 504 if @stats.wb_command_map.key?(:alist_swap_and_sized_insert) 505 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_sized_insert], k, data) 506 end 507 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 508 send_data("STORED\r\n") 509 else 510 send_data("NOT_STORED\r\n") 511 end 512 rescue => e 513 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 514 send_data("#{msg}\r\n") 515 @log.error("#{e} #{$@}") 516 end
alist_swap_and_sized_push <key> <array-size> <bytes> [forward]rn <data block>rn
(STORED|NOT_PUSHED|NOT_STORED|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 974 def ev_alist_swap_and_sized_push(s) 975 hname, k, d, vn, nodes = calc_hash(s[1]) 976 data = read_bytes(s[3].to_i) 977 read_bytes(2) 978 return forward2(nodes[0], s, data) if nodes[0] != @nid 979 980 ddata = @storages[hname].get(vn, k, d) 981 if ddata 982 v = Marshal.load(ddata) 983 else 984 v = [[],[]] 985 end 986 987 max = s[2].to_i 988 989 idx = v[0].index(data) 990 if idx 991 v[0].delete_at(idx) 992 v[1].delete_at(idx) 993 else 994 return send_data("NOT_PUSHED\r\n") if v[0].length >= max 995 end 996 v[0].push(data) 997 v[0] = v[0][0..(max - 1)] 998 v[1].push(Time.now.to_i) 999 v[1] = v[1][0..(max - 1)] 1000 1001 expt = 0x7fffffff 1002 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 1003 @stats.write_count += 1 1004 1005 if ret 1006 if @stats.wb_command_map.key?(:alist_swap_and_sized_push) 1007 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_sized_push], k, data) 1008 end 1009 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 1010 send_data("STORED\r\n") 1011 else 1012 send_data("NOT_STORED\r\n") 1013 end 1014 rescue => e 1015 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1016 send_data("#{msg}\r\n") 1017 @log.error("#{e} #{$@}") 1018 end
alist_to_json <key> [index|range] [forward]rn
( VALUE <key> 0 <length of json string>rn <json string>rn ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 724 def ev_alist_to_json(s) 725 hname, k, d, vn, nodes = calc_hash(s[1]) 726 return forward1(nodes[0], s) if nodes[0] != @nid 727 728 ddata = @storages[hname].get(vn, k, 0) 729 @stats.read_count += 1 730 731 if ddata 732 v = Marshal.load(ddata)[0] 733 ret = nil 734 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2] 735 if $1 736 if v.length <= $1.to_i 737 return send_data("END\r\n") 738 end 739 ret = JSON.generate(v[Range.new($1.to_i,$1.to_i)]) 740 else 741 ret = JSON.generate(v[Range.new($2.to_i,$3.to_i)]) 742 end 743 else 744 ret = JSON.generate(v) 745 end 746 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 747 else 748 return send_data("END\r\n") 749 end 750 rescue => e 751 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 752 send_data("#{msg}\r\n") 753 @log.error("#{e} #{$@}") 754 end
alist_to_s <key> [index|range] [forward]rn
( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)
# File lib/roma/plugin/plugin_alist.rb 1209 def ev_alist_to_s(s) 1210 hname, k, d, vn, nodes = calc_hash(s[1]) 1211 return forward1(nodes[0], s) if nodes[0] != @nid 1212 1213 ddata = @storages[hname].get(vn, k, 0) 1214 @stats.read_count += 1 1215 1216 return send_data("END\r\n") unless ddata 1217 v = to_alist_value(ddata) 1218 if v 1219 ret = nil 1220 if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2] 1221 if $1 1222 ret = v[0][Range.new($1.to_i,$1.to_i)].to_s 1223 else 1224 ret = v[0][Range.new($2.to_i,$3.to_i)].to_s 1225 end 1226 else 1227 ret = v[0].to_s 1228 end 1229 len = v[0].length 1230 send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n") 1231 return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n") 1232 else 1233 return send_data("SERVER_ERROR data other than alist's format already exist.\r\n") 1234 end 1235 rescue => e 1236 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1237 send_data("#{msg}\r\n") 1238 @log.error("#{e} #{$@}") 1239 end
alist_update_at <key> <index> <bytes>rn <data block>rn
(STORED|NOT_STORED|NOT_FOUND|SERVER_ERROR <error message>)rn
# File lib/roma/plugin/plugin_alist.rb 1127 def ev_alist_update_at(s) 1128 hname, k, d, vn, nodes = calc_hash(s[1]) 1129 data = read_bytes(s[3].to_i) 1130 read_bytes(2) 1131 return forward2(nodes[0], s, data) if nodes[0] != @nid 1132 1133 ddata = @storages[hname].get(vn, k, d) 1134 return send_data("NOT_FOUND\r\n") unless ddata 1135 1136 v = Marshal.load(ddata) 1137 1138 idx = s[2].to_i 1139 return send_data("NOT_FOUND\r\n") if idx < 0 || v[0].length <= idx 1140 v[0][idx] = data 1141 v[1][idx] = Time.now.to_i 1142 1143 expt = 0x7fffffff 1144 ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v)) 1145 @stats.write_count += 1 1146 1147 if ret 1148 if @stats.wb_command_map.key?(:alist_update_at) 1149 Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_update_at], k, data) 1150 end 1151 redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4]) 1152 send_data("STORED\r\n") 1153 else 1154 send_data("NOT_STORED\r\n") 1155 end 1156 rescue => e 1157 msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ") 1158 send_data("#{msg}\r\n") 1159 @log.error("#{e} #{$@}") 1160 end
Private Instance Methods
# File lib/roma/plugin/plugin_alist.rb 1383 def calc_hash(key) 1384 k,hname = key.split("\e") 1385 hname ||= @defhash 1386 d = Digest::SHA1.hexdigest(k).hex % @rttable.hbits 1387 vn = @rttable.get_vnode_id(d) 1388 nodes = @rttable.search_nodes_for_write(vn) 1389 [hname, k, d, vn, nodes] 1390 end
# File lib/roma/plugin/plugin_alist.rb 1324 def expired_str_to_i(s) 1325 if s.upcase =~ /(\d+)([H|D])?/ 1326 t = $1.to_i 1327 if $2 == 'D' 1328 t *= 86400 1329 elsif $2 == 'H' 1330 t *= 3600 1331 end 1332 t 1333 else 1334 nil 1335 end 1336 end
# File lib/roma/plugin/plugin_alist.rb 1307 def expired_swap(v, rcv_val, et) 1308 del = [rcv_val] 1309 expt = Time.now.to_i - et 1310 v[1].each_with_index{|t,i| 1311 # @log.debug("v=#{v[0][i]} expt=#{expt} t=#{t} #{expt >= t}") 1312 del << v[0][i] if expt >= t 1313 } 1314 del.each{|dat| 1315 i = v[0].index(dat) 1316 if i 1317 v[0].delete_at(i) 1318 v[1].delete_at(i) 1319 end 1320 } 1321 v 1322 end
for a several lines received command
# File lib/roma/plugin/plugin_alist.rb 1393 def forward1(nid, rs, data=nil) 1394 if rs.last == "forward" 1395 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 1396 end 1397 1398 @log.warn("forward #{rs} to #{nid}"); 1399 1400 buf = '' 1401 rs.each{|ss| buf << "#{ss} " } 1402 buf << "forward\r\n" 1403 if data 1404 buf << data 1405 buf << "\r\n" 1406 end 1407 1408 con = get_connection(nid) 1409 con.send(buf) 1410 1411 buf = con.gets 1412 if buf == nil 1413 @rttable.proc_failed(nid) 1414 @log.error("forward get failed:nid=#{nid} rs=#{rs} #{$@}") 1415 return send_data("SERVER_ERROR Message forward failed.\r\n") 1416 elsif buf.start_with?("ERROR") 1417 @rttable.proc_succeed(nid) 1418 con.close_connection 1419 @log.error("forward get failed:nid=#{nid} rs=#{rs} #{$@}") 1420 return send_data("SERVER_ERROR Message forward failed.\r\n") 1421 elsif buf.start_with?("VALUE") == false 1422 return_connection(nid, con) 1423 @rttable.proc_succeed(nid) 1424 return send_data(buf) 1425 end 1426 1427 res = '' 1428 begin 1429 res << buf 1430 s = buf.split(/ /) 1431 if s[0] != 'VALUE' 1432 return_connection(nid, con) 1433 @rttable.proc_succeed(nid) 1434 return send_data(buf) 1435 end 1436 res << con.read_bytes(s[3].to_i + 2) 1437 end while (buf = con.gets)!="END\r\n" 1438 1439 res << "END\r\n" 1440 1441 return_connection(nid, con) 1442 @rttable.proc_succeed(nid) 1443 1444 send_data(res) 1445 rescue => e 1446 @rttable.proc_failed(nid) if e.message != "no connection" 1447 @log.error("forward get failed:nid=#{nid} rs=#{rs} #{e} #{$@}") 1448 send_data("SERVER_ERROR Message forward failed.\r\n") 1449 end
for a one line reveived command
# File lib/roma/plugin/plugin_alist.rb 1452 def forward2(nid, rs, data=nil) 1453 if rs.last == "forward" 1454 return send_data("SERVER_ERROR Routing table is inconsistent.\r\n") 1455 end 1456 1457 @log.warn("forward #{rs} to #{nid}"); 1458 1459 buf = '' 1460 rs.each{|ss| buf << "#{ss} " } 1461 buf << "forward\r\n" 1462 if data 1463 buf << data 1464 buf << "\r\n" 1465 end 1466 1467 res = send_cmd(nid, buf) 1468 if res == nil || res.start_with?("ERROR") 1469 return send_data("SERVER_ERROR Message forward failed.\r\n") 1470 end 1471 send_data("#{res}\r\n") 1472 end
# File lib/roma/plugin/plugin_alist.rb 1364 def merge_list(hname, vn, last, clk, expt, k, v, val) 1365 ddata = @storages[hname].get(vn, k, 0) 1366 if ddata 1367 lv = Marshal.load(ddata) 1368 lv[0].each{|buf| 1369 idx = val[0].index(buf) 1370 if idx 1371 val[0].delete_at(idx) 1372 val[1].delete_at(idx) 1373 end 1374 } 1375 lv[0] += val[0] 1376 lv[1] += val[1] 1377 @storages[hname].set(vn, k, 0, expt ,Marshal.dump(lv)) 1378 else 1379 @storages[hname].load_stream_dump(vn, last, clk, expt, k, v) 1380 end 1381 end
# File lib/roma/plugin/plugin_alist.rb 1347 def to_alist_value(v) 1348 # Marshal.dump([[],[]])[0..3].unpack("cc a c") 1349 # => [4, 8, "[", 7] 1350 # marshal format version 4.8 1351 # array object "[" 1352 # array.length fixednum format 7 (7-5=2) 1353 return nil if v == nil || v[0..3] != "\x04\b[\a" 1354 val = Marshal.load(v) 1355 if val[0].instance_of?(Array) && val[1].instance_of?(Array) 1356 return val 1357 else 1358 return nil 1359 end 1360 rescue 1361 nil 1362 end
# File lib/roma/plugin/plugin_alist.rb 1338 def to_alist_value_for_write(hname, vn, k, d) 1339 ddata = @storages[hname].get(vn, k, d) 1340 unless ddata 1341 v = [[],[]] 1342 else 1343 v = to_alist_value(ddata) 1344 end 1345 end