module Roma::CommandPlugin::PluginAshiatoList

Public Instance Methods

ev_alist_at(s) click to toggle source

alist_at <key> <index> [forward]rn

( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)

   # File lib/roma/plugin/plugin_alist.rb
18 def ev_alist_at(s)
19   hname, k, d, vn, nodes = calc_hash(s[1])
20   return forward1(nodes[0], s)  if nodes[0] != @nid
21 
22   ddata = @storages[hname].get(vn, k, d)
23   @stats.read_count += 1
24   if ddata
25     v = Marshal.load(ddata)[0]
26     return send_data("END\r\n") if v.length <= s[2].to_i
27     ret = v.at(s[2].to_i)
28     ret = "" unless ret
29     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
30   else
31     return send_data("END\r\n")
32   end        
33 rescue => e
34   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
35   send_data("#{msg}\r\n")
36   @log.error("#{e} #{$@}")
37 end
ev_alist_clear(s) click to toggle source

alist_clear <key> [forward]rn

(CLEARED|NOT_CLEARED|SERVER_ERROR <error message>)rn

   # File lib/roma/plugin/plugin_alist.rb
42 def ev_alist_clear(s)
43   hname, k, d, vn, nodes = calc_hash(s[1])
44   return forward2(nodes[0], s) if nodes[0] != @nid
45 
46   ddata = @storages[hname].get(vn, k, d)
47   return send_data("NOT_FOUND\r\n") unless ddata
48 
49   expt = 0x7fffffff
50   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump([[],[]]))
51   @stats.delete_count += 1
52 
53 
54   if ret
55     if @stats.wb_command_map.key?(:alist_clear)
56       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_clear], k, ddata)
57     end
58     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
59     send_data("CLEARED\r\n")
60   else
61     send_data("NOT_CLEARED\r\n")
62   end  
63 rescue => e
64   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
65   send_data("#{msg}\r\n")
66   @log.error("#{e} #{$@}")
67 end
ev_alist_delete(s) click to toggle source

alist_delete <key> <bytes> [forward]rn <data block>rn

(DELETED|NOT_DELETED|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
 73 def ev_alist_delete(s)
 74   hname, k, d, vn, nodes = calc_hash(s[1])
 75   data = read_bytes(s[2].to_i)
 76   read_bytes(2)
 77   return forward2(nodes[0], s, data) if nodes[0] != @nid
 78   
 79   ddata = @storages[hname].get(vn, k, d)
 80   return send_data("NOT_FOUND\r\n") unless ddata
 81 
 82   v = Marshal.load(ddata)
 83   return send_data("NOT_DELETED\r\n") unless v[0].include?(data)
 84   while(idx = v[0].index(data))
 85     v[0].delete_at(idx)
 86     v[1].delete_at(idx)
 87   end
 88 
 89   expt = 0x7fffffff
 90   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
 91   @stats.delete_count += 1
 92 
 93   if ret
 94     if @stats.wb_command_map.key?(:alist_delete)
 95       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_delete], k, data)
 96     end
 97     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
 98     send_data("DELETED\r\n")
 99   else
100     send_data("NOT_DELETED\r\n")
101   end  
102 rescue => e
103   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
104   send_data("#{msg}\r\n")
105   @log.error("#{e} #{$@}")
106 end
ev_alist_delete_and_prepend(s) click to toggle source
    # File lib/roma/plugin/plugin_alist.rb
424 def ev_alist_delete_and_prepend(s); ev_alist_swap_and_insert(s); end
ev_alist_delete_at(s) click to toggle source

alist_delete_at <key> <index> [forward]rn

(DELETED|NOT_DELETED|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
111 def ev_alist_delete_at(s)
112   hname, k, d, vn, nodes = calc_hash(s[1])
113   return forward2(nodes[0], s) if nodes[0] != @nid
114   
115   ddata = @storages[hname].get(vn, k, d)
116   return send_data("NOT_FOUND\r\n") unless ddata
117 
118   v = Marshal.load(ddata)
119   dret = v[0].delete_at(s[2].to_i)
120   return send_data("NOT_DELETED\r\n") unless dret
121   v[1].delete_at(s[2].to_i)
122 
123   expt = 0x7fffffff
124   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
125   @stats.delete_count += 1
126 
127   if ret
128     if @stats.wb_command_map.key?(:alist_delete_at)
129       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_delete_at], k, dret)
130     end
131     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
132     send_data("DELETED\r\n")
133   else
134     send_data("NOT_DELETED\r\n")
135   end  
136 rescue => e
137   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
138   send_data("#{msg}\r\n")
139   @log.error("${e} #{$@}")
140 end
ev_alist_empty?(s) click to toggle source

alist_empty? <key> [forward]rn

(true|false|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
145 def ev_alist_empty?(s)
146   hname, k, d, vn, nodes = calc_hash(s[1])
147   return forward2(nodes[0], s) if nodes[0] != @nid
148 
149   ddata = @storages[hname].get(vn, k, d)
150   @stats.read_count += 1
151 
152   return send_data("NOT_FOUND\r\n") unless ddata
153   
154   v = Marshal.load(ddata)
155   ret = v[0].empty?
156   
157   send_data("#{ret}\r\n")
158 rescue => e
159   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
160   send_data("#{msg}\r\n")
161   @log.error("#{e} #{$@}")
162 end
ev_alist_expired_swap_and_insert(s) click to toggle source

alist_expired_swap_and_insert <key> <expire-time> <bytes> [forward]rn <data block>rn

the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time suffix is hour. 'd' as expire-time suffix is day.

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
528       def ev_alist_expired_swap_and_insert(s)
529         hname, k, d, vn, nodes = calc_hash(s[1])
530         data = read_bytes(s[3].to_i)
531         read_bytes(2)
532         return forward2(nodes[0], s, data) if nodes[0] != @nid
533 
534         et = expired_str_to_i(s[2])
535         return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et
536 
537         v = to_alist_value_for_write(hname, vn, k, d)
538         unless v
539           return send_data("SERVER_ERROR data other than alist's format already exist.\r\n")
540         end
541 
542 # @log.debug("#{s[2]} et=#{et}")
543         v = expired_swap(v, data, et)
544 
545         v[0].insert(0,data)
546         v[1].insert(0,Time.now.to_i)
547 
548         expt = 0x7fffffff
549         ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
550         @stats.write_count += 1
551 
552         if ret
553           if @stats.wb_command_map.key?(:alist_expired_swap_and_insert)
554             Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_insert], k, data)
555           end
556           redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
557           send_data("STORED\r\n")
558         else
559           send_data("NOT_STORED\r\n")
560         end  
561       rescue => e
562         msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
563         send_data("#{msg}\r\n")
564         @log.error("#{e} #{$@}")
565       end
ev_alist_expired_swap_and_push(s) click to toggle source

alist_expired_swap_and_push <key> <expire-time> <bytes> [forward]rn <data block>rn

the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time suffix is hour. 'd' as expire-time suffix is day.

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

     # File lib/roma/plugin/plugin_alist.rb
1030       def ev_alist_expired_swap_and_push(s)
1031         hname, k, d, vn, nodes = calc_hash(s[1])
1032         data = read_bytes(s[3].to_i)
1033         read_bytes(2)
1034         return forward2(nodes[0], s, data) if nodes[0] != @nid
1035 
1036         et = expired_str_to_i(s[2])
1037         return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et
1038 
1039         v = to_alist_value_for_write(hname, vn, k, d)
1040         unless v
1041           return send_data("SERVER_ERROR data other than alist's format already exist.\r\n")
1042         end
1043 
1044 # @log.debug("#{s[2]} et=#{et}")
1045         v = expired_swap(v, data, et)
1046 
1047         v[0].push(data)
1048         v[1].push(Time.now.to_i)
1049 
1050         expt = 0x7fffffff
1051         ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
1052         @stats.write_count += 1
1053 
1054         if ret
1055           if @stats.wb_command_map.key?(:alist_expired_swap_and_push)
1056             Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_push], k, data)
1057           end
1058           redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
1059           send_data("STORED\r\n")
1060         else
1061           send_data("NOT_STORED\r\n")
1062         end  
1063       rescue => e
1064         msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1065         send_data("#{msg}\r\n")
1066         @log.error("#{e} #{$@}")
1067       end
ev_alist_expired_swap_and_sized_insert(s) click to toggle source

alist_expired_swap_and_sized_insert <key> <expire-time> <array-size> <bytes> [forward]rn <data block>rn

the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time suffix is hour. 'd' as expire-time suffix is day.

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
577       def ev_alist_expired_swap_and_sized_insert(s)
578         hname, k, d, vn, nodes = calc_hash(s[1])
579         data = read_bytes(s[4].to_i)
580         read_bytes(2)
581         return forward2(nodes[0], s, data) if nodes[0] != @nid
582 
583         et = expired_str_to_i(s[2])
584         return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et
585 
586         v = to_alist_value_for_write(hname, vn, k, d)
587         unless v
588           return send_data("SERVER_ERROR data other than alist's format already exist.\r\n")
589         end
590 
591 # @log.debug("#{s[2]} et=#{et}")
592         v = expired_swap(v, data, et)
593 
594         v[0].insert(0,data)
595         v[0] = v[0][0..(s[3].to_i - 1)]
596         v[1].insert(0,Time.now.to_i)
597         v[1] = v[1][0..(s[3].to_i - 1)]
598 
599         expt = 0x7fffffff
600         ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
601         @stats.write_count += 1
602 
603         if ret
604           if @stats.wb_command_map.key?(:alist_expired_swap_and_sized_insert)
605             Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_sized_insert], k, data)
606           end
607           redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
608           send_data("STORED\r\n")
609         else
610           send_data("NOT_STORED\r\n")
611         end  
612       rescue => e
613         msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
614         send_data("#{msg}\r\n")
615         @log.error("#{e} #{$@}")
616       end
ev_alist_expired_swap_and_sized_push(s) click to toggle source

alist_expired_swap_and_sized_push <key> <expire-time> <array-size> <bytes> [forward]rn <data block>rn

the data expire-time's ago will be deleated. the unit of the expire-time's is a second. however,as follows when there is a suffix. 'h' as expire-time suffix is hour. 'd' as expire-time suffix is day.

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

     # File lib/roma/plugin/plugin_alist.rb
1079       def ev_alist_expired_swap_and_sized_push(s)
1080         hname, k, d, vn, nodes = calc_hash(s[1])
1081         data = read_bytes(s[4].to_i)
1082         read_bytes(2)
1083         return forward2(nodes[0], s, data) if nodes[0] != @nid
1084 
1085         et = expired_str_to_i(s[2])
1086         return send_data("SERVER_ERROR format error in expire-time.\r\n") unless et
1087 
1088         v = to_alist_value_for_write(hname, vn, k, d)
1089         unless v
1090           return send_data("SERVER_ERROR data other than alist's format already exist.\r\n")
1091         end
1092 
1093 # @log.debug("#{s[2]} et=#{et}")
1094         v = expired_swap(v, data, et)
1095 
1096         max = s[3].to_i
1097         return send_data("NOT_PUSHED\r\n") if v[0].length >= max
1098 
1099         v[0].push(data)
1100         v[0] = v[0][0..(max - 1)]
1101         v[1].push(Time.now.to_i)
1102         v[1] = v[1][0..(max - 1)]
1103 
1104         expt = 0x7fffffff
1105         ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
1106         @stats.write_count += 1
1107 
1108         if ret
1109           if @stats.wb_command_map.key?(:alist_expired_swap_and_sized_push)
1110             Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_expired_swap_and_sized_push], k, data)
1111           end
1112           redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
1113           send_data("STORED\r\n")
1114         else
1115           send_data("NOT_STORED\r\n")
1116         end  
1117       rescue => e
1118         msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1119         send_data("#{msg}\r\n")
1120         @log.error("#{e} #{$@}")
1121       end
ev_alist_first(s) click to toggle source

alist_first <key> [forward]rn

( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
171 def ev_alist_first(s)
172   hname, k, d, vn, nodes = calc_hash(s[1])
173   return forward1(nodes[0], s) if nodes[0] != @nid
174 
175   ddata = @storages[hname].get(vn, k, d)
176   @stats.read_count += 1
177 
178   if ddata
179     v = Marshal.load(ddata)[0]
180     return send_data("END\r\n") if v.length == 0
181     ret = v.first
182     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
183   else
184     return send_data("END\r\n")
185   end        
186 rescue => e
187   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
188   send_data("#{msg}\r\n")
189   @log.error("#{e} #{$@}")
190 end
ev_alist_gets(s) click to toggle source

alist_gets <key> [index|range] [forward]rn

( [VALUE <key> 0 <length of length string>rn <length string>rn (VALUE <key> 0 <value length>rn <value>rn)* ] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
202 def ev_alist_gets(s)
203   hname, k, d, vn, nodes = calc_hash(s[1])
204   return forward1(nodes[0], s) if nodes[0] != @nid
205 
206   ddata = @storages[hname].get(vn, k, 0)
207   @stats.read_count += 1
208 
209   if ddata
210     v = Marshal.load(ddata)[0]
211     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2]
212       if $1
213         if v.length <= $1.to_i
214           return send_data("END\r\n")
215         end
216         buf = v[Range.new($1.to_i,$1.to_i)]
217       else
218         buf = v[Range.new($2.to_i,$3.to_i)]
219       end
220     else
221       buf = v
222     end
223     len = v.length
224     send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n")
225     buf.each{|val|
226       send_data("VALUE #{s[1]} 0 #{val.length}\r\n#{val}\r\n")
227     }
228     return send_data("END\r\n")
229   else
230     return send_data("END\r\n")
231   end
232 rescue => e
233   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
234   send_data("#{msg}\r\n")
235   @log.error("#{e} #{$@}")
236 end
ev_alist_gets_with_time(s) click to toggle source

alist_gets_with_time <key> [index|range] [forward]rn

( [VALUE <key> 0 <length of length string>rn <length string>rn (VALUE <key> 0 <value length>rn <value string>rn VALUE <key> 0 <value length>rn <time string>rn)* ] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
250 def ev_alist_gets_with_time(s)
251   hname, k, d, vn, nodes = calc_hash(s[1])
252   return forward1(nodes[0], s) if nodes[0] != @nid
253 
254   ddata = @storages[hname].get(vn, k, 0)
255   @stats.read_count += 1
256 
257   if ddata
258     v = Marshal.load(ddata)
259     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2]
260       if $1
261         if v[0].length <= $1.to_i
262           return send_data("END\r\n")
263         end
264         v_buf = v[0][Range.new($1.to_i,$1.to_i)]
265         t_buf = v[1][Range.new($1.to_i,$1.to_i)]
266       else
267         v_buf = v[0][Range.new($2.to_i,$3.to_i)]
268         t_buf = v[1][Range.new($2.to_i,$3.to_i)]
269       end
270     else
271       v_buf = v[0]
272       t_buf = v[1]
273     end
274     len = v[0].length
275     send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n")
276     v_buf.each_with_index{|val,idx|
277       send_data("VALUE #{s[1]} 0 #{val.length}\r\n#{val}\r\n")
278       send_data("VALUE #{s[1]} 0 #{t_buf[idx].to_s.length}\r\n#{t_buf[idx]}\r\n")
279     }
280     return send_data("END\r\n")
281   else
282     return send_data("END\r\n")
283   end
284 rescue => e
285   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
286   send_data("#{msg}\r\n")
287   @log.error("#{e} #{$@}")
288 end
ev_alist_include?(s) click to toggle source

alist_include? <key> <bytes> [forward]rn <data block>rn

(true|false|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
295 def ev_alist_include?(s)
296   hname, k, d, vn, nodes = calc_hash(s[1])
297   data = read_bytes(s[2].to_i)
298   read_bytes(2)
299   return forward2(nodes[0], s, data) if nodes[0] != @nid
300 
301   ddata = @storages[hname].get(vn, k, d)
302   @stats.read_count += 1
303 
304   return send_data("NOT_FOUND\r\n") unless ddata
305   
306   v = Marshal.load(ddata)[0]
307   ret = v.include?(data)
308   
309   send_data("#{ret}\r\n")
310 rescue => e
311   send_data("SERVER_ERROR #{e} #{$@}\r\n")
312   @log.error("#{e} #{$@}") 
313 end
ev_alist_index(s) click to toggle source

alist_index <key> <bytes> [forward]rn <data block>rn

(<index>|nil|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
319 def ev_alist_index(s)
320   hname, k, d, vn, nodes = calc_hash(s[1])
321   data = read_bytes(s[2].to_i)
322   read_bytes(2)
323   return forward2(nodes[0], s, data) if nodes[0] != @nid
324 
325   ddata = @storages[hname].get(vn, k, d)
326   @stats.read_count += 1
327 
328   return send_data("NOT_FOUND\r\n") unless ddata
329   
330   v = Marshal.load(ddata)[0]
331   ret = v.index(data)
332   if ret
333     send_data("#{ret}\r\n")
334   else
335     send_data("nil\r\n")
336   end
337 rescue => e
338   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
339   send_data("#{msg}\r\n")
340   @log.error("#{e} #{$@}")
341 end
ev_alist_insert(s) click to toggle source

alist_insert <key> <index> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
347 def ev_alist_insert(s)
348   hname, k, d, vn, nodes = calc_hash(s[1])
349   data = read_bytes(s[3].to_i)
350   read_bytes(2)
351   return forward2(nodes[0], s, data) if nodes[0] != @nid
352 
353   ddata = @storages[hname].get(vn, k, d)
354   if ddata
355     v = Marshal.load(ddata)
356   else
357     v = [[],[]]
358   end
359 
360   v[0].insert(s[2].to_i,data)
361   v[1].insert(s[2].to_i,Time.now.to_i)
362   expt = 0x7fffffff
363   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
364   @stats.write_count += 1
365 
366   if ret
367     if @stats.wb_command_map.key?(:alist_insert)
368       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_insert], k, data)
369     end
370     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
371     send_data("STORED\r\n")
372   else
373     send_data("NOT_STORED\r\n")
374   end  
375 rescue => e
376   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
377   send_data("#{msg}\r\n")
378   @log.error("#{e} #{$@}")
379 end
ev_alist_join(s) click to toggle source

alist_join <key> <bytes> [index|range] [forward]rn <separator block>rn

( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
681 def ev_alist_join(s)
682   hname, k, d, vn, nodes = calc_hash(s[1])
683   data = read_bytes(s[2].to_i)
684   read_bytes(2)
685   return forward1(nodes[0], s, data) if nodes[0] != @nid
686 
687   ddata = @storages[hname].get(vn, k, 0)
688   @stats.read_count += 1
689 
690   if ddata
691     v = Marshal.load(ddata)[0]
692     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[3]
693       if $1
694         if v.length <= $1.to_i
695           return send_data("END\r\n")
696         end
697         buf = v[Range.new($1.to_i,$1.to_i)]
698       else
699         buf = v[Range.new($2.to_i,$3.to_i)]
700       end
701     else
702       buf = v
703     end
704     len = v.length
705     ret = buf.join(data)
706     send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n")
707     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
708   else
709     return send_data("END\r\n")
710   end
711 rescue => e
712   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
713   send_data("#{msg}\r\n")
714   @log.error("#{e} #{$@}")
715 end
ev_alist_join_with_time(s) click to toggle source

alist_join_with_time <key> <bytes> [index|range] [forward]rn <separator block>rn

( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value string>rn VALUE <key> 0 <value length>rn <time string>rn] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
631 def ev_alist_join_with_time(s)
632   hname, k, d, vn, nodes = calc_hash(s[1])
633   data = read_bytes(s[2].to_i)
634   read_bytes(2)
635   return forward1(nodes[0], s, data) if nodes[0] != @nid
636 
637   ddata = @storages[hname].get(vn, k, 0)
638   @stats.read_count += 1
639   if ddata
640     v = Marshal.load(ddata)
641     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[3]
642       if $1
643         if v[0].length <= $1.to_i
644           return send_data("END\r\n")
645         end
646         v_buf = v[0][Range.new($1.to_i,$1.to_i)]
647         t_buf = v[1][Range.new($1.to_i,$1.to_i)]
648       else
649         v_buf = v[0][Range.new($2.to_i,$3.to_i)]
650         t_buf = v[1][Range.new($2.to_i,$3.to_i)]
651       end
652     else
653       v_buf = v[0]
654       t_buf = v[1]
655     end
656     len = v[0].length
657     v_ret = v_buf.join(data)
658     t_ret = t_buf.join(data)
659     send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n")
660     send_data("VALUE #{s[1]} 0 #{v_ret.length}\r\n#{v_ret}\r\n")
661     return send_data("VALUE #{s[1]} 0 #{t_ret.length}\r\n#{t_ret}\r\nEND\r\n")
662   else
663     return send_data("END\r\n")
664   end
665 rescue => e
666   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
667   send_data("#{msg}\r\n")
668   @log.error("#{e} #{$@}")
669 end
ev_alist_last(s) click to toggle source

alist_last <key> [forward]rn

( [VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
763 def ev_alist_last(s)
764   hname, k, d, vn, nodes = calc_hash(s[1])
765   return forward1(nodes[0], s) if nodes[0] != @nid
766 
767   ddata = @storages[hname].get(vn, k, d)
768   @stats.read_count += 1
769 
770   if ddata
771     v = Marshal.load(ddata)[0]
772     return send_data("END\r\n") if v.length == 0
773     ret = v.last
774     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
775   else
776     return send_data("END\r\n")
777   end        
778 rescue => e
779   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
780   send_data("#{msg}\r\n")
781   @log.error("#{e} #{$@}")
782 end
ev_alist_length(s) click to toggle source

alist_length <key> [forward]rn

(<length>|NOT_FOUND|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
787 def ev_alist_length(s)
788   hname, k, d, vn, nodes = calc_hash(s[1])
789   return forward2(nodes[0], s) if nodes[0] != @nid
790   ddata = @storages[hname].get(vn, k, d)
791   @stats.read_count += 1
792 
793   return send_data("NOT_FOUND\r\n") unless ddata
794   v = Marshal.load(ddata)[0]
795   ret = v.length
796   send_data("#{ret}\r\n")
797 rescue => e
798   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
799   send_data("#{msg}\r\n")
800   @log.error("#{e} #{$@}")
801 end
ev_alist_pop(s) click to toggle source

alist_pop <key> [forward]rn

( [VALUE <key> 0 <value length>rn <value>rn] END |NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
810 def ev_alist_pop(s)
811   hname, k, d, vn, nodes = calc_hash(s[1])
812   return forward1(nodes[0], s) if nodes[0] != @nid
813 
814   ddata = @storages[hname].get(vn, k, d)
815   if ddata
816     v = Marshal.load(ddata)
817     return send_data("END\r\n") if v[0].length ==0
818   else
819     return send_data("END\r\n")
820   end
821 
822   retv = v[0].pop
823   v[1].pop
824   expt = 0x7fffffff
825   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
826   @stats.read_count += 1
827   @stats.write_count += 1
828 
829   if ret
830     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
831     send_data("VALUE #{s[1]} 0 #{retv.length}\r\n#{retv}\r\nEND\r\n")
832   else
833     send_data("NOT_STORED\r\n")
834   end  
835 rescue => e
836   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
837   send_data("#{msg}\r\n")
838   @log.error("#{e} #{$@}")
839 end
ev_alist_push(s) click to toggle source

alist_push <key> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
845 def ev_alist_push(s)
846   hname, k, d, vn, nodes = calc_hash(s[1])
847   data = read_bytes(s[2].to_i)
848   read_bytes(2)
849   if nodes[0] != @nid
850     return forward2(nodes[0], s, data)
851   end
852 
853   ddata = @storages[hname].get(vn, k, d)
854   if ddata
855     v = Marshal.load(ddata)
856   else
857     v = [[],[]]
858   end
859 
860   v[0].push(data)
861   v[1].push(Time.now.to_i)
862   expt = 0x7fffffff
863   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
864   @stats.write_count += 1
865 
866   if ret
867     if @stats.wb_command_map.key?(:alist_push)
868       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_push], k, data)
869     end
870     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
871     send_data("STORED\r\n")
872   else
873     send_data("NOT_STORED\r\n")
874   end  
875 rescue => e
876   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
877   send_data("#{msg}\r\n")
878   @log.error("#{e} #{$@}")
879 end
ev_alist_shift(s) click to toggle source

alist_shift <key> [forward]rn

( [VALUE <key> 0 <value length>rn <value>rn] END |NOT_STORED|SERVER_ERROR <error message>)rn

     # File lib/roma/plugin/plugin_alist.rb
1169 def ev_alist_shift(s)
1170   hname, k, d, vn, nodes = calc_hash(s[1])
1171   return forward1(nodes[0], s) if nodes[0] != @nid
1172 
1173   ddata = @storages[hname].get(vn, k, d)
1174   if ddata
1175     v = Marshal.load(ddata)
1176     return send_data("END\r\n") if v[0].length ==0
1177   else
1178     return send_data("END\r\n")
1179   end
1180 
1181   retv = v[0].shift
1182   v[1].shift
1183   expt = 0x7fffffff
1184   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
1185   @stats.read_count += 1
1186   @stats.write_count += 1
1187 
1188   if ret
1189     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
1190     send_data("VALUE #{s[1]} 0 #{retv.length}\r\n#{retv}\r\nEND\r\n")
1191   else
1192     send_data("NOT_STORED\r\n")
1193   end  
1194 rescue => e
1195   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1196   send_data("#{msg}\r\n")
1197   @log.error("#{e} #{$@}")
1198 end
ev_alist_sized_delete_and_prepend(s) click to toggle source
    # File lib/roma/plugin/plugin_alist.rb
470 def ev_alist_sized_delete_and_prepend(s); ev_alist_swap_and_sized_insert(s); end
ev_alist_sized_insert(s) click to toggle source

alist_sized_insert <key> <array-size> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
387 def ev_alist_sized_insert(s)
388   hname, k, d, vn, nodes = calc_hash(s[1])
389   data = read_bytes(s[3].to_i)
390   read_bytes(2)
391   return forward2(nodes[0], s, data) if nodes[0] != @nid
392 
393   ddata = @storages[hname].get(vn, k, d)
394   if ddata
395     v = Marshal.load(ddata)
396   else
397     v = [[],[]]
398   end
399 
400   v[0].insert(0,data)
401   v[0] = v[0][0..(s[2].to_i - 1)]
402   v[1].insert(0,Time.now.to_i)
403   v[1] = v[1][0..(s[2].to_i - 1)]
404 
405   expt = 0x7fffffff
406   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
407   @stats.write_count += 1
408 
409   if ret
410     if @stats.wb_command_map.key?(:alist_sized_insert)
411       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_sized_insert], k, data)
412     end
413     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
414     send_data("STORED\r\n")
415   else
416     send_data("NOT_STORED\r\n")
417   end  
418 rescue => e
419   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
420   send_data("#{msg}\r\n")
421   @log.error("#{e} #{$@}")
422 end
ev_alist_sized_prepend(s) click to toggle source
    # File lib/roma/plugin/plugin_alist.rb
381 def ev_alist_sized_prepend(s); ev_alist_sized_insert(s); end
ev_alist_sized_push(s) click to toggle source

alist_sized_push <key> <array-size> <bytes> [forward]rn <data block>rn

(STORED|NOT_PUSHED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
885 def ev_alist_sized_push(s)
886   hname, k, d, vn, nodes = calc_hash(s[1])
887   data = read_bytes(s[3].to_i)
888   read_bytes(2)
889   return forward2(nodes[0], s, data) if nodes[0] != @nid
890 
891   ddata = @storages[hname].get(vn, k, d)
892   if ddata
893     v = Marshal.load(ddata)
894   else
895     v = [[],[]]
896   end
897 
898   max = s[2].to_i
899   return send_data("NOT_PUSHED\r\n") if v[0].length >= max
900 
901   v[0].push(data)
902   v[0] = v[0][0..(max - 1)]
903   v[1].push(Time.now.to_i)
904   v[1] = v[1][0..(max - 1)]
905 
906   expt = 0x7fffffff
907   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
908   @stats.write_count += 1
909 
910 
911   if ret
912     if @stats.wb_command_map.key?(:alist_sized_push)
913       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_sized_push], k, data)
914     end
915     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
916     send_data("STORED\r\n")
917   else
918     send_data("NOT_STORED\r\n")
919   end  
920 rescue => e
921   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
922   send_data("#{msg}\r\n")
923   @log.error("#{e} #{$@}")
924 end
ev_alist_spushv(s) click to toggle source

alist_spushv <hash-name> <vnode-id> src dst

|  ['alist_spushv' <hname> <vn>\r\n]->|
|<-['READY'\r\n]                      |
|                       [<dumpdata>]->|
|                             :       |
|                             :       |
|                    [<end of dump>]->|
|<-['STORED'\r\n]                     |
     # File lib/roma/plugin/plugin_alist.rb
1251       def ev_alist_spushv(s)
1252         if s.length != 3
1253           @log.error("#{__method__}:wrong number of arguments(#{s})")
1254           return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
1255         end
1256         if @stats.spushv_protection
1257           @log.info("#{__method__}:In spushv_protection")
1258           return send_data("SERVER_ERROR In spushv_protection.\r\n")          
1259         end
1260         @stats.run_receive_a_vnode["#{s[1]}_#{s[2]}"] = true
1261 
1262         $roma.stop_clean_up
1263 
1264         send_data("READY\r\n")
1265 
1266         count = 0
1267         loop {
1268           context_bin = read_bytes(20, 100)
1269           vn, last, clk, expt, klen = context_bin.unpack('NNNNN')
1270           break if klen == 0 # end of dump ?
1271           k = read_bytes(klen)
1272           vlen_bin = read_bytes(4, 100)
1273           vlen, =  vlen_bin.unpack('N')
1274           v = read_bytes(vlen, 100)
1275           val = to_alist_value(v)
1276           if val
1277 # @log.debug("listdata #{vn} #{k} #{val.inspect}")
1278              count += 1 if merge_list(s[1], vn, last, clk, expt, k, v, val)
1279           else
1280 # @log.debug("not listdata #{vn} #{k} #{val}")
1281             count += 1 if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, v)
1282           end
1283         }
1284         if @stats.spushv_protection
1285           @log.info("#{__method__}:Canceled because of spushv_protection")
1286           send_data("CANCELED\r\n")
1287         else
1288           send_data("STORED\r\n")
1289         end
1290         @log.debug("alist #{count} keys loaded.")
1291       rescue Storage::StorageException => e
1292         @log.error("#{e.inspect} #{$@}")
1293         close_connection
1294         if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) &&
1295             Config::STORAGE_EXCEPTION_ACTION == :shutdown
1296           @log.error("#{__method__}:Romad will stop")
1297           @stop_event_loop = true
1298         end
1299       rescue => e
1300         @log.error("#{e}\n#{$@}")
1301       ensure
1302         @stats.run_receive_a_vnode.delete("#{s[1]}_#{s[2]}") if s.length == 3
1303       end
ev_alist_swap_and_insert(s) click to toggle source

alist_swap_and_insert <key> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
430 def ev_alist_swap_and_insert(s)
431   hname, k, d, vn, nodes = calc_hash(s[1])
432   data = read_bytes(s[2].to_i)
433   read_bytes(2)
434   return forward2(nodes[0], s, data) if nodes[0] != @nid
435 
436   ddata = @storages[hname].get(vn, k, d)
437   if ddata
438     v = Marshal.load(ddata)
439   else
440     v = [[],[]]
441   end
442 
443   idx = v[0].index(data)
444   if idx
445     v[0].delete_at(idx)
446     v[1].delete_at(idx)
447   end
448   v[0].insert(0,data)
449   v[1].insert(0,Time.now.to_i)
450 
451   expt = 0x7fffffff
452   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
453   @stats.write_count += 1
454 
455   if ret
456     if @stats.wb_command_map.key?(:alist_swap_and_insert)
457       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_insert], k, data)
458     end
459     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
460     send_data("STORED\r\n")
461   else
462     send_data("NOT_STORED\r\n")
463   end  
464 rescue => e
465   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
466   send_data("#{msg}\r\n")
467   @log.error("#{e} #{$@}")
468 end
ev_alist_swap_and_push(s) click to toggle source

alist_swap_and_push <key> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
930 def ev_alist_swap_and_push(s)
931   hname, k, d, vn, nodes = calc_hash(s[1])
932   data = read_bytes(s[2].to_i)
933   read_bytes(2)
934   return forward2(nodes[0], s, data) if nodes[0] != @nid
935 
936   ddata = @storages[hname].get(vn, k, d)
937   if ddata
938     v = Marshal.load(ddata)
939   else
940     v = [[],[]]
941   end
942 
943   idx = v[0].index(data)
944   if idx
945     v[0].delete_at(idx)
946     v[1].delete_at(idx)
947   end
948   v[0].push(data)
949   v[1].push(Time.now.to_i)
950 
951   expt = 0x7fffffff
952   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
953   @stats.write_count += 1
954 
955   if ret
956     if @stats.wb_command_map.key?(:alist_swap_and_push)
957       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_push], k, data)
958     end
959     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
960     send_data("STORED\r\n")
961   else
962     send_data("NOT_STORED\r\n")
963   end  
964 rescue => e
965   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
966   send_data("#{msg}\r\n")
967   @log.error("#{e} #{$@}")
968 end
ev_alist_swap_and_sized_insert(s) click to toggle source

alist_swap_and_sized_insert <key> <array-size> <bytes> [forward]rn <data block>rn

(STORED|NOT_STORED|SERVER_ERROR <error message>)rn

    # File lib/roma/plugin/plugin_alist.rb
476 def ev_alist_swap_and_sized_insert(s)
477   hname, k, d, vn, nodes = calc_hash(s[1])
478   data = read_bytes(s[3].to_i)
479   read_bytes(2)
480   return forward2(nodes[0], s, data) if nodes[0] != @nid
481 
482   ddata = @storages[hname].get(vn, k, d)
483   if ddata
484     v = Marshal.load(ddata)
485   else
486     v = [[],[]]
487   end
488 
489   idx = v[0].index(data)
490   if idx
491     v[0].delete_at(idx)
492     v[1].delete_at(idx)
493   end
494   v[0].insert(0,data)
495   v[1].insert(0,Time.now.to_i)
496   v[0] = v[0][0..(s[2].to_i - 1)]
497   v[1] = v[1][0..(s[2].to_i - 1)]
498 
499   expt = 0x7fffffff
500   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
501   @stats.write_count += 1
502 
503   if ret
504     if @stats.wb_command_map.key?(:alist_swap_and_sized_insert)
505       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_sized_insert], k, data)
506     end
507     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
508     send_data("STORED\r\n")
509   else
510     send_data("NOT_STORED\r\n")
511   end  
512 rescue => e
513   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
514   send_data("#{msg}\r\n")
515   @log.error("#{e} #{$@}")
516 end
ev_alist_swap_and_sized_push(s) click to toggle source

alist_swap_and_sized_push <key> <array-size> <bytes> [forward]rn <data block>rn

(STORED|NOT_PUSHED|NOT_STORED|SERVER_ERROR <error message>)rn

     # File lib/roma/plugin/plugin_alist.rb
 974 def ev_alist_swap_and_sized_push(s)
 975   hname, k, d, vn, nodes = calc_hash(s[1])
 976   data = read_bytes(s[3].to_i)
 977   read_bytes(2)
 978   return forward2(nodes[0], s, data) if nodes[0] != @nid
 979 
 980   ddata = @storages[hname].get(vn, k, d)
 981   if ddata
 982     v = Marshal.load(ddata)
 983   else
 984     v = [[],[]]
 985   end
 986 
 987   max = s[2].to_i
 988 
 989   idx = v[0].index(data)
 990   if idx
 991     v[0].delete_at(idx)
 992     v[1].delete_at(idx)
 993   else
 994     return send_data("NOT_PUSHED\r\n") if v[0].length >= max
 995   end
 996   v[0].push(data)
 997   v[0] = v[0][0..(max - 1)]
 998   v[1].push(Time.now.to_i)
 999   v[1] = v[1][0..(max - 1)]
1000 
1001   expt = 0x7fffffff
1002   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
1003   @stats.write_count += 1
1004 
1005   if ret
1006     if @stats.wb_command_map.key?(:alist_swap_and_sized_push)
1007       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_swap_and_sized_push], k, data)
1008     end
1009     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
1010     send_data("STORED\r\n")
1011   else
1012     send_data("NOT_STORED\r\n")
1013   end  
1014 rescue => e
1015   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1016   send_data("#{msg}\r\n")
1017   @log.error("#{e} #{$@}")
1018 end
ev_alist_to_json(s) click to toggle source

alist_to_json <key> [index|range] [forward]rn

( VALUE <key> 0 <length of json string>rn <json string>rn ENDrn |SERVER_ERROR <error message>rn)

    # File lib/roma/plugin/plugin_alist.rb
724 def ev_alist_to_json(s)
725   hname, k, d, vn, nodes = calc_hash(s[1])
726   return forward1(nodes[0], s) if nodes[0] != @nid
727 
728   ddata = @storages[hname].get(vn, k, 0)
729   @stats.read_count += 1
730 
731   if ddata
732     v = Marshal.load(ddata)[0]
733     ret = nil
734     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2]
735       if $1
736         if v.length <= $1.to_i
737           return send_data("END\r\n")
738         end
739         ret = JSON.generate(v[Range.new($1.to_i,$1.to_i)])
740       else
741         ret = JSON.generate(v[Range.new($2.to_i,$3.to_i)])
742       end
743     else
744       ret = JSON.generate(v)
745     end
746     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
747   else
748     return send_data("END\r\n")
749   end
750 rescue => e
751   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
752   send_data("#{msg}\r\n")
753   @log.error("#{e} #{$@}")
754 end
ev_alist_to_s(s) click to toggle source

alist_to_s <key> [index|range] [forward]rn

( [VALUE <key> 0 <length of length string>rn <length string>rn VALUE <key> 0 <value length>rn <value>rn] ENDrn |SERVER_ERROR <error message>rn)

     # File lib/roma/plugin/plugin_alist.rb
1209 def ev_alist_to_s(s)
1210   hname, k, d, vn, nodes = calc_hash(s[1])
1211   return forward1(nodes[0], s) if nodes[0] != @nid
1212 
1213   ddata = @storages[hname].get(vn, k, 0)
1214   @stats.read_count += 1
1215 
1216   return send_data("END\r\n") unless ddata
1217   v = to_alist_value(ddata)
1218   if v
1219     ret = nil
1220     if /(?:^(\d+)$|^(\d+)..((?:-)?\d+)$)/ =~ s[2]
1221       if $1
1222         ret = v[0][Range.new($1.to_i,$1.to_i)].to_s
1223       else
1224         ret = v[0][Range.new($2.to_i,$3.to_i)].to_s
1225       end
1226     else
1227       ret = v[0].to_s
1228     end
1229     len = v[0].length
1230     send_data("VALUE #{s[1]} 0 #{len.to_s.length}\r\n#{len.to_s}\r\n")
1231     return send_data("VALUE #{s[1]} 0 #{ret.length}\r\n#{ret}\r\nEND\r\n")
1232   else
1233     return send_data("SERVER_ERROR data other than alist's format already exist.\r\n")
1234   end
1235 rescue => e
1236   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1237   send_data("#{msg}\r\n")
1238   @log.error("#{e} #{$@}")
1239 end
ev_alist_update_at(s) click to toggle source

alist_update_at <key> <index> <bytes>rn <data block>rn

(STORED|NOT_STORED|NOT_FOUND|SERVER_ERROR <error message>)rn

     # File lib/roma/plugin/plugin_alist.rb
1127 def ev_alist_update_at(s)
1128   hname, k, d, vn, nodes = calc_hash(s[1])
1129   data = read_bytes(s[3].to_i)
1130   read_bytes(2)
1131   return forward2(nodes[0], s, data) if nodes[0] != @nid
1132 
1133   ddata = @storages[hname].get(vn, k, d)
1134   return send_data("NOT_FOUND\r\n") unless ddata
1135 
1136   v = Marshal.load(ddata)
1137 
1138   idx = s[2].to_i
1139   return send_data("NOT_FOUND\r\n") if idx < 0 || v[0].length <= idx
1140   v[0][idx] = data
1141   v[1][idx] = Time.now.to_i
1142 
1143   expt = 0x7fffffff
1144   ret = @storages[hname].set(vn, k, d, expt ,Marshal.dump(v))
1145   @stats.write_count += 1
1146 
1147   if ret
1148     if @stats.wb_command_map.key?(:alist_update_at)
1149       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:alist_update_at], k, data)
1150     end
1151     redundant(nodes[1..-1], hname, k, d, ret[2], expt, ret[4])
1152     send_data("STORED\r\n")
1153   else
1154     send_data("NOT_STORED\r\n")
1155   end  
1156 rescue => e
1157   msg = "SERVER_ERROR #{e} #{$@}".tr("\r\n"," ")
1158   send_data("#{msg}\r\n")
1159   @log.error("#{e} #{$@}")
1160 end

Private Instance Methods

calc_hash(key) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1383 def calc_hash(key)
1384   k,hname = key.split("\e")
1385   hname ||= @defhash
1386   d = Digest::SHA1.hexdigest(k).hex % @rttable.hbits
1387   vn = @rttable.get_vnode_id(d)
1388   nodes = @rttable.search_nodes_for_write(vn)
1389   [hname, k, d, vn, nodes]        
1390 end
expired_str_to_i(s) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1324 def expired_str_to_i(s)
1325   if s.upcase =~ /(\d+)([H|D])?/
1326     t = $1.to_i
1327     if $2 == 'D'
1328       t *= 86400 
1329     elsif $2 == 'H'
1330       t *= 3600
1331     end
1332     t
1333   else
1334     nil
1335   end
1336 end
expired_swap(v, rcv_val, et) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1307       def expired_swap(v, rcv_val, et)
1308         del = [rcv_val]
1309         expt =  Time.now.to_i - et
1310         v[1].each_with_index{|t,i|
1311 # @log.debug("v=#{v[0][i]} expt=#{expt} t=#{t} #{expt >= t}")
1312           del << v[0][i] if expt >= t
1313         }
1314         del.each{|dat|
1315           i = v[0].index(dat)
1316           if i
1317             v[0].delete_at(i)
1318             v[1].delete_at(i)
1319           end
1320         }
1321         v
1322       end
forward1(nid, rs, data=nil) click to toggle source

for a several lines received command

     # File lib/roma/plugin/plugin_alist.rb
1393 def forward1(nid, rs, data=nil)
1394   if rs.last == "forward"
1395     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
1396   end
1397   
1398   @log.warn("forward #{rs} to #{nid}");
1399 
1400   buf = ''
1401   rs.each{|ss| buf << "#{ss} " }
1402   buf << "forward\r\n"
1403   if data
1404     buf << data
1405     buf << "\r\n"
1406   end
1407   
1408   con = get_connection(nid)
1409   con.send(buf)
1410 
1411   buf = con.gets
1412   if buf == nil
1413     @rttable.proc_failed(nid)
1414     @log.error("forward get failed:nid=#{nid} rs=#{rs} #{$@}")
1415     return send_data("SERVER_ERROR Message forward failed.\r\n")
1416   elsif buf.start_with?("ERROR")
1417     @rttable.proc_succeed(nid)
1418     con.close_connection
1419     @log.error("forward get failed:nid=#{nid} rs=#{rs} #{$@}")
1420     return send_data("SERVER_ERROR Message forward failed.\r\n")
1421   elsif buf.start_with?("VALUE") == false
1422     return_connection(nid, con)
1423     @rttable.proc_succeed(nid)
1424     return send_data(buf)
1425   end
1426   
1427   res = ''
1428   begin
1429     res << buf
1430     s = buf.split(/ /)
1431     if s[0] != 'VALUE'
1432       return_connection(nid, con)
1433       @rttable.proc_succeed(nid)
1434       return send_data(buf)
1435     end
1436     res << con.read_bytes(s[3].to_i + 2)          
1437   end while (buf = con.gets)!="END\r\n"
1438 
1439   res << "END\r\n"
1440 
1441   return_connection(nid, con)
1442   @rttable.proc_succeed(nid)
1443 
1444   send_data(res)
1445 rescue => e
1446   @rttable.proc_failed(nid) if e.message != "no connection"
1447   @log.error("forward get failed:nid=#{nid} rs=#{rs} #{e} #{$@}")
1448   send_data("SERVER_ERROR Message forward failed.\r\n")
1449 end
forward2(nid, rs, data=nil) click to toggle source

for a one line reveived command

     # File lib/roma/plugin/plugin_alist.rb
1452 def forward2(nid, rs, data=nil)
1453   if rs.last == "forward"
1454     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
1455   end
1456 
1457   @log.warn("forward #{rs} to #{nid}");
1458 
1459   buf = ''
1460   rs.each{|ss| buf << "#{ss} " }
1461   buf << "forward\r\n"
1462   if data
1463     buf << data
1464     buf << "\r\n"
1465   end
1466 
1467   res = send_cmd(nid, buf)
1468   if res == nil || res.start_with?("ERROR")
1469     return send_data("SERVER_ERROR Message forward failed.\r\n")
1470   end
1471   send_data("#{res}\r\n")
1472 end
merge_list(hname, vn, last, clk, expt, k, v, val) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1364 def merge_list(hname, vn, last, clk, expt, k, v, val)
1365   ddata = @storages[hname].get(vn, k, 0)
1366   if ddata
1367     lv = Marshal.load(ddata)
1368     lv[0].each{|buf|
1369       idx = val[0].index(buf)
1370       if idx
1371         val[0].delete_at(idx)
1372         val[1].delete_at(idx)
1373       end
1374     }
1375     lv[0] += val[0]
1376     lv[1] += val[1]
1377     @storages[hname].set(vn, k, 0, expt ,Marshal.dump(lv))
1378   else
1379     @storages[hname].load_stream_dump(vn, last, clk, expt, k, v)
1380   end
1381 end
to_alist_value(v) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1347 def to_alist_value(v)
1348   # Marshal.dump([[],[]])[0..3].unpack("cc a c")
1349   # => [4, 8, "[", 7]
1350   # marshal format version 4.8
1351   # array object "["
1352   # array.length fixednum format 7 (7-5=2)
1353   return nil if v == nil || v[0..3] != "\x04\b[\a"
1354   val = Marshal.load(v)
1355   if val[0].instance_of?(Array) && val[1].instance_of?(Array)
1356     return val
1357   else
1358     return nil
1359   end
1360 rescue
1361   nil
1362 end
to_alist_value_for_write(hname, vn, k, d) click to toggle source
     # File lib/roma/plugin/plugin_alist.rb
1338 def to_alist_value_for_write(hname, vn, k, d)
1339   ddata = @storages[hname].get(vn, k, d)
1340   unless ddata
1341     v = [[],[]]
1342   else
1343     v = to_alist_value(ddata)
1344   end
1345 end