module Roma::CommandPlugin::PluginStorage

Public Instance Methods

ev_add(s) click to toggle source

“add” means that “add a new data to a store” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn

    # File lib/roma/plugin/plugin_storage.rb
248 def ev_add(s); set(:add,s); end
ev_append(s) click to toggle source

“append” means that “append a new data to the previous one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn

    # File lib/roma/plugin/plugin_storage.rb
260 def ev_append(s); set(:append,s); end
ev_cas(s) click to toggle source

“cas” means that “store this data but only if no one else has updated since I last fetched it.” <command name> <key> <flags> <exptime> <bytes> <cas-id>rn <data block>rn

    # File lib/roma/plugin/plugin_storage.rb
273 def ev_cas(s)
274   key,hname = s[1].split("\e")
275   hname ||= @defhash
276   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
277   v = read_bytes(s[4].to_i)
278   read_bytes(2)
279   vn = @rttable.get_vnode_id(d)
280   nodes = @rttable.search_nodes_for_write(vn)
281   if nodes[0] != @nid
282     @log.warn("forward cas key=#{key} vn=#{vn} to #{nodes[0]}")
283     res = send_cmd(nodes[0],"fcas #{key}\e#{hname} #{d} #{s[3]} #{v.length} #{s[5]}\r\n#{v}\r\n")
284     if res == nil || res.start_with?("ERROR")
285       return send_data("SERVER_ERROR Message forward failed.\r\n")
286     end
287     return send_data("#{res}\r\n")
288   end
289 
290   store_cas(hname, vn, key, d, s[5].to_i, s[3].to_i, v, nodes[1..-1])
291 end
ev_decr(s) click to toggle source

decr <key> <value> [noreply]rn

    # File lib/roma/plugin/plugin_storage.rb
316 def ev_decr(s); incr_decr(:decr,s); end
ev_delete(s) click to toggle source

delete <key> [<time>] [noreply]rn

    # File lib/roma/plugin/plugin_storage.rb
118 def ev_delete(s)
119   if s.length < 2
120     @log.error("delete:wrong number of arguments(#{s})")
121     return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
122   end
123 
124   key,hname = s[1].split("\e")
125   hname ||= @defhash
126   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
127   vn = @rttable.get_vnode_id(d)
128   nodes = @rttable.search_nodes_for_write(vn)
129 
130   if nodes[0] != @nid
131     cmd = "fdelete #{key}\e#{hname}"
132     s[2..-1].each{|c| cmd << " #{c}"}
133     cmd << "\r\n"
134     @log.warn("forward delete #{s[1]}")
135     res = send_cmd(nodes[0], cmd)
136     if res == nil || res.start_with?("ERROR")
137       return send_data("SERVER_ERROR Message forward failed.\r\n")
138     end
139     return send_data("#{res}\r\n")
140   end
141 
142   unless @storages.key?(hname)
143     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
144     return
145   end
146 
147   if @stats.wb_command_map.key?(:delete__prev)
148     data = @storages[hname].get(vn, key, d)
149     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete__prev], key, data) if data
150   end
151   
152   res = @storages[hname].delete(vn, key, d)
153   @stats.delete_count += 1
154 
155   return send_data("NOT_DELETED\r\n") unless res
156   return send_data("NOT_FOUND\r\n") if res == :deletemark
157 
158   if @stats.wb_command_map.key?(:delete)
159     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete], key, res[4])
160   end
161 
162   nodes[1..-1].each{ |nid|
163     res2 = send_cmd(nid,"rdelete #{key}\e#{hname} #{res[2]}\r\n")
164     unless res2
165       Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('rdelete',[nid,hname,s[1],res[2]]))
166       @log.warn("rdelete failed:#{s[1]}\e#{hname} #{d} #{res[2]} -> #{nid}")
167     end
168   }
169   return send_data("NOT_FOUND\r\n") unless res[4]
170 
171   if $roma.cr_writer.run_replication
172     fnc = 'delete'
173     Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]}\r\n", s[1], nil)
174   end
175 
176   send_data("DELETED\r\n")
177 end
ev_fadd(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
249 def ev_fadd(s); fset(:add,s); end
ev_fappend(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
261 def ev_fappend(s); fset(:append,s); end
ev_fcas(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
293 def ev_fcas(s)
294   key,hname = s[1].split("\e")
295   hname ||= @defhash
296   d = s[2].to_i
297   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0
298   v = read_bytes(s[4].to_i)
299   read_bytes(2)
300   vn = @rttable.get_vnode_id(d)
301   nodes = @rttable.search_nodes_for_write(vn)
302   if nodes.include?(@nid) == false
303     @log.error("fcas failed key = #{s[1]} vn = #{vn}")
304     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
305   end
306 
307   nodes.delete(@nid)
308   store_cas(hname, vn, key, d, s[5].to_i, s[3].to_i, v, nodes)
309 end
ev_fdecr(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
317 def ev_fdecr(s); fincr_fdecr(:decr,s); end
ev_fdelete(s) click to toggle source

fdelete <key> [<time>] [noreply]rn

    # File lib/roma/plugin/plugin_storage.rb
180 def ev_fdelete(s)
181   key,hname = s[1].split("\e")
182   hname ||= @defhash
183   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
184   vn = @rttable.get_vnode_id(d)
185   nodes = @rttable.search_nodes_for_write(vn)
186   if nodes.include?(@nid) == false
187     @log.error("fdelete failed delete key=#{s[1]} vn=#{vn}")
188     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
189   end
190   unless @storages.key?(hname)
191     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
192     return
193   end
194 
195   if @stats.wb_command_map.key?(:delete__prev)
196     data = @storages[hname].get(vn, key, d)
197     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete__prev], key, data) if data
198   end
199   
200   res = @storages[hname].delete(vn, key, d)
201   @stats.delete_count += 1
202 
203   return send_data("NOT_DELETED\r\n") unless res
204   return send_data("NOT_FOUND\r\n") if res == :deletemark
205 
206   if @stats.wb_command_map.key?(:delete)
207     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:delete], key, res[4])
208   end
209 
210   nodes.delete(@nid)
211   nodes.each{ |nid|
212     res2 = send_cmd(nid,"rdelete #{key}\e#{hname} #{res[2]}\r\n")
213     unless res2
214       Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('rdelete',[nid,hname,s[1],res[2]]))
215       @log.warn("rdelete failed:#{s[1]}\e#{hname} #{d} #{res[2]} -> #{nid}")
216     end
217   }
218   return send_data("NOT_FOUND\r\n") unless res[4]
219 
220   if $roma.cr_writer.run_replication
221     fnc = 'delete'
222     Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]}\r\n", s[1], nil)
223   end
224 
225   send_data("DELETED\r\n")
226 end
ev_fget(s) click to toggle source

fget <key>

   # File lib/roma/plugin/plugin_storage.rb
54 def ev_fget(s)
55   key,hname = s[1].split("\e")
56   hname ||= @defhash
57   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
58   vn = @rttable.get_vnode_id(d)
59   nodes = @rttable.search_nodes(vn)
60 
61   unless nodes.include?(@nid)
62     @log.error("fget failed key=#{s[1]} vn=#{vn}")
63     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
64   end
65 
66   unless @storages.key?(hname)
67     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
68     return
69   end
70   data = @storages[hname].get(vn, key, 0)
71   @stats.read_count += 1
72   send_data("VALUE #{s[1]} 0 #{data.length}\r\n#{data}\r\n") if data
73   send_data("END\r\n")
74 end
ev_fincr(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
313 def ev_fincr(s); fincr_fdecr(:incr,s); end
ev_fprepend(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
267 def ev_fprepend(s); fset(:prepend,s); end
ev_freplace(s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
255 def ev_freplace(s); fset(:replace,s); end
ev_fset(s) click to toggle source
   # File lib/roma/plugin/plugin_storage.rb
15 def ev_fset(s); fset(:set,s); end
ev_fset_expt(s) click to toggle source

fset_expt <key> <expt>

    # File lib/roma/plugin/plugin_storage.rb
367 def ev_fset_expt(s)
368   key,hname = s[1].split("\e")
369   hname ||= @defhash
370   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
371   vn = @rttable.get_vnode_id(d)
372   nodes = @rttable.search_nodes_for_write(vn)
373   if nodes.include?(@nid) == false
374     @log.error("fset_expt failed key = #{s[1]} vn = #{vn}")
375     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
376   end
377   
378   unless @storages.key?(hname)
379     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
380     return
381   end
382 
383   if @stats.wb_command_map.key?(:set_expt__prev)
384     # [vn, t, clk, expt, val]
385     data = @storages[hname].get_raw(vn, key, d)
386     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt__prev], key, data[3].to_s) if data
387   end
388 
389   expt = chg_time_expt(s[2].to_i)
390   ret = @storages[hname].set_expt(vn, key, d, expt)
391 
392   if ret
393     if @stats.wb_command_map.key?(:set_expt)
394       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt], key, expt.to_s)
395     end
396     redundant(nodes[1..-1], hname, key, d, ret[2], ret[3], ret[4])
397     if $roma.cr_writer.run_replication
398       fnc = 'set_expt'
399       Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]} #{expt}\r\n", s[1], expt)
400     end
401     send_data("STORED\r\n")
402   else
403     return send_data("NOT_STORED\r\n")
404   end
405 end
ev_get(s) click to toggle source

get <key>*rn

   # File lib/roma/plugin/plugin_storage.rb
18 def ev_get(s)
19   if s.length < 2
20     @log.error("get:wrong number of arguments(#{s})")
21     return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
22   end
23 
24   return ev_gets(s) if s.length > 2
25 
26   key,hname = s[1].split("\e")
27   hname ||= @defhash
28   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
29   vn = @rttable.get_vnode_id(d)
30   nodes = @rttable.search_nodes(vn)
31 
32   unless nodes.include?(@nid)
33     @log.warn("forward get #{s[1]}")
34     res = forward_get(nodes[0], s[1], d)
35     if res
36       send_data(res)
37     else
38       send_data("SERVER_ERROR Message forward failed.\r\n")
39     end
40     return
41   end
42 
43   unless @storages.key?(hname)
44     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
45     return
46   end
47   data = @storages[hname].get(vn, key, 0)
48   @stats.read_count += 1
49   send_data("VALUE #{s[1]} 0 #{data.length}\r\n#{data}\r\n") if data
50   send_data("END\r\n")
51 end
ev_get_expt(s) click to toggle source

If you want to get expired time as UNIXTIME, set the 'unix' in last argument Unless set this, expired time will be sent back as date format. get_expt <key> [unix]

    # File lib/roma/plugin/plugin_storage.rb
410 def ev_get_expt(s)
411   unless s.length.between?(2, 3)
412     @log.error("get_expt: wrong number of arguments(#{s.length-1} to 2-3)")
413     return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
414   end
415   case s[2]
416   when 'unix'
417     is_unix = true
418   when nil
419     is_unix = false
420   else
421     @log.error("get_expt: wrong format of arguments.")
422     return send_data("CLIENT_ERROR Wrong format of arguments.\r\n")
423   end
424 
425   key, hname = s[1].split("\e")
426   hname ||= @defhash
427   unless @storages.key?(hname)
428     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
429     return
430   end
431 
432   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
433   vn = @rttable.get_vnode_id(d)
434 
435   nodes = @rttable.search_nodes(vn)
436   unless nodes.include?(@nid)
437     @log.warn("forward get_expt #{s[1]} #{s[2]}")
438     res = forward_get_expt(nodes[0], vn, s[1], s[2])
439     if res
440       send_data(res)
441     else
442       send_data("SERVER_ERROR Message forward failed.\r\n")
443     end
444     return
445   end
446 
447   data = @storages[hname].db_get(vn, key)
448   if data
449     if is_unix
450       expt = data.unpack('NNNNa*')[3]
451     else
452       expt = Time.at(data.unpack('NNNNa*')[3])
453     end
454     send_data("#{expt}\r\n")
455   end
456   send_data("END\r\n")
457 end
ev_gets(s) click to toggle source

gets <key>*rn

    # File lib/roma/plugin/plugin_storage.rb
 77 def ev_gets(s)
 78   nk = {} # {node-id1=>[key1,key2,..],node-id2=>[key3,key4,..]}
 79   kvn = {} # {key1=>vn1, key2=>vn2, ... }
 80   s[1..-1].each{|kh|
 81     key, = kh.split("\e") # split a hash-name
 82     d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
 83     kvn[key] = vn = @rttable.get_vnode_id(d)
 84     nodes = @rttable.search_nodes(vn)
 85     unless nodes.empty? # check the node existence
 86       nk[nodes[0]]=[] unless nk.key?(nodes[0])
 87       nk[nodes[0]] << kh
 88     end
 89   }
 90 
 91   res = {} # result data {key1=>val1,key2=>val2,...}
 92   if nk.key?(@nid)
 93     nk[@nid].each{|kh|
 94       key,hname = kh.split("\e")
 95       hname ||= @defhash
 96       if @storages.key?(hname)
 97         vn, t, clk, expt, val = @storages[hname].get_raw(kvn[key], key, 0)
 98         @stats.read_count += 1
 99         res[key] = [clk, val] if val && Time.now.to_i <= expt
100       end
101     }
102     nk.delete(@nid)
103   end
104 
105   nk.each_pair{|nid,keys|
106     res.merge!(forward_gets(nid,keys))
107   }
108 
109   res.each_pair{|key,cv|
110     clk, val = cv
111     send_data("VALUE #{key} 0 #{val.length} #{clk}\r\n#{val}\r\n")
112   }
113   send_data("END\r\n")
114 end
ev_incr(s) click to toggle source

incr <key> <value> [noreply]rn

    # File lib/roma/plugin/plugin_storage.rb
312 def ev_incr(s); incr_decr(:incr,s); end
ev_prepend(s) click to toggle source

“prepend” means that “prepend a new data to the previous one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn

    # File lib/roma/plugin/plugin_storage.rb
266 def ev_prepend(s); set(:prepend,s); end
ev_rdelete(s) click to toggle source

rdelete <key> <clock>

    # File lib/roma/plugin/plugin_storage.rb
229 def ev_rdelete(s)
230   key,hname = s[1].split("\e")
231   hname ||= @defhash
232   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
233   vn = @rttable.get_vnode_id(d)
234   unless @storages.key?(hname)
235     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
236     return
237   end
238   if @storages[hname].rdelete(vn, key, d, s[2].to_i)
239     send_data("DELETED\r\n")
240   else
241     send_data("NOT_FOUND\r\n")
242   end
243 end
ev_replace(s) click to toggle source

“replace” means that “replace the previous data with a new one” <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn

    # File lib/roma/plugin/plugin_storage.rb
254 def ev_replace(s); set(:replace,s); end
ev_rset_size_of_zredundant(s) click to toggle source

rset_size_of_zredundant <n>

    # File lib/roma/plugin/plugin_storage.rb
472 def ev_rset_size_of_zredundant(s)
473   if s.length != 2 || s[1].to_i == 0
474     return send_data("usage:set_set_size_of_zredundant <n>\r\n")
475   end
476   @stats.size_of_zredundant = s[1].to_i
477   send_data("STORED\r\n")
478 end
ev_set(s) click to toggle source

“set” means “store this data”. <command name> <key> <flags> <exptime> <bytes> [noreply]rn <data block>rn

   # File lib/roma/plugin/plugin_storage.rb
14 def ev_set(s); set(:set,s); end
ev_set_expt(s) click to toggle source

set_expt <key> <expt>

    # File lib/roma/plugin/plugin_storage.rb
320       def ev_set_expt(s)
321         key,hname = s[1].split("\e")
322         hname ||= @defhash
323         d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
324         vn = @rttable.get_vnode_id(d)
325         nodes = @rttable.search_nodes_for_write(vn)
326         if nodes[0] != @nid
327           @log.warn("forward set_expt key=#{key} vn=#{vn} to #{nodes[0]}")
328           res = send_cmd(nodes[0],"fset_expt #{s[1]} #{s[2]}\r\n")
329           if res
330             return send_data("#{res}\r\n")
331           end
332           return send_data("SERVER_ERROR Message forward failed.\r\n")
333         end
334         
335         unless @storages.key?(hname)
336           send_data("SERVER_ERROR #{hname} does not exists.\r\n")
337           return
338         end
339 
340         if @stats.wb_command_map.key?(:set_expt__prev)
341 @log.debug(":set_export__prev")
342           # [vn, t, clk, expt, val]
343           data = @storages[hname].get_raw(vn, key, d)
344           Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt__prev], key, data[3].to_s) if data
345         end
346 
347         expt = chg_time_expt(s[2].to_i)
348         ret = @storages[hname].set_expt(vn, key, d, expt)
349 
350         if ret
351           if @stats.wb_command_map.key?(:set_expt)
352 @log.debug(":set_export")
353             Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:set_expt], key, expt.to_s)
354           end
355           redundant(nodes[1..-1], hname, key, d, ret[2], ret[3], ret[4])
356           if $roma.cr_writer.run_replication
357             fnc = 'set_expt'
358             Roma::WriteBehindProcess::push(nil, "#{fnc} #{s[1]} #{expt}\r\n", s[1], expt)
359           end
360           send_data("STORED\r\n")
361         else
362           return send_data("NOT_STORED\r\n")
363         end
364       end
ev_set_size_of_zredundant(s) click to toggle source

set_size_of_zredundant <n>

    # File lib/roma/plugin/plugin_storage.rb
461 def ev_set_size_of_zredundant(s)
462   if s.length != 2 || s[1].to_i == 0
463     return send_data("usage:set_set_size_of_zredundant <n>\r\n")
464   end
465   res = broadcast_cmd("rset_size_of_zredundant #{s[1]}\r\n")
466   @stats.size_of_zredundant = s[1].to_i
467   res[@stats.ap_str] = "STORED"
468   send_data("#{res}\r\n")
469 end

Private Instance Methods

fincr_fdecr(fnc,s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
736 def fincr_fdecr(fnc,s)
737   key,hname = s[1].split("\e")
738   hname ||= @defhash
739   d = s[2].to_i
740   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0
741   v = s[3].to_i
742   vn = @rttable.get_vnode_id(d)
743   nodes = @rttable.search_nodes_for_write(vn)
744   if nodes.include?(@nid) == false
745     @log.debug("f#{fnc} failed key = #{s[1]} vn = #{vn}")
746     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
747   end
748   
749   nodes.delete(@nid)
750   store_incr_decr(fnc, hname, vn, key, d, v, nodes)
751 end
forward_get(nid, k, d) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
482 def forward_get(nid, k, d)
483   con = get_connection(nid)
484   con.send("fget #{k}\r\n")
485   res = con.gets
486   if res == nil
487     @rttable.proc_failed(nid)
488     @log.error("forward get failed:nid=#{nid} key=#{k}")
489     return nil
490   elsif res == "END\r\n"
491     # value does not found
492   elsif res.start_with?("ERROR")
493     @rttable.proc_succeed(nid)
494     con.close_connection
495     return nil
496   else
497     s = res.split(/ /)
498     res << con.read_bytes(s[3].to_i + 2)
499     res << con.gets
500   end
501   return_connection(nid, con)
502   @rttable.proc_succeed(nid)
503   res
504 rescue => e
505   @rttable.proc_failed(nid) if e.message != "no connection"
506   @log.error("#{e.inspect}/#{$@}")
507   @log.error("forward get failed:nid=#{nid} key=#{k}")
508   nil
509 end
forward_get_expt(nid, vn, key, is_unix=nil) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
529 def forward_get_expt(nid, vn, key, is_unix=nil)
530   con = get_connection(nid)
531   con.send("get_expt #{key} #{is_unix}\r\n")
532   res = ''
533   while((line = con.gets)!="END\r\n")
534     res = line.chomp
535   end
536   return_connection(nid, con)
537   @rttable.proc_succeed(nid)
538   res
539 rescue => e
540   @rttable.proc_failed(nid)
541   @log.error("forward get_expt failed:nid=#{nid} key=#{key}")
542   nil
543 end
forward_gets(nid, keys) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
511 def forward_gets(nid, keys)
512   con = get_connection(nid)
513   con.send("gets #{keys.join(' ')}\r\n")
514   res = {}
515   while((line = con.gets)!="END\r\n")
516     s = line.chomp.split(/ /)
517     res[s[1]] = [s[4], con.read_bytes(s[3].to_i)]
518     con.read_bytes(2)
519   end
520   return_connection(nid, con)
521   @rttable.proc_succeed(nid)
522   res
523 rescue => e
524   @rttable.proc_failed(nid)
525   @log.error("forward gets failed:nid=#{nid} key=#{keys}")
526   nil
527 end
fset(fnc,s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
670 def fset(fnc,s)
671   key,hname = s[1].split("\e")
672   hname ||= @defhash
673   d = s[2].to_i
674   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0
675   v = read_bytes(s[4].to_i)
676   read_bytes(2)
677   vn = @rttable.get_vnode_id(d)
678   nodes = @rttable.search_nodes_for_write(vn)
679   if nodes.include?(@nid) == false
680     @log.error("f#{fnc} failed key = #{s[1]} vn = #{vn}")
681     return send_data("SERVER_ERROR Routing table is inconsistent.\r\n")
682   end
683 
684   nodes.delete(@nid)
685   store(fnc, hname, vn, key, d, s[3].to_i, v, nodes)
686 end
incr_decr(fnc,s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
717 def incr_decr(fnc,s)
718   key,hname = s[1].split("\e")
719   hname ||= @defhash
720   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
721   v = s[2].to_i
722   vn = @rttable.get_vnode_id(d)
723   nodes = @rttable.search_nodes_for_write(vn)
724   if nodes[0] != @nid
725     @log.debug("forward #{fnc} key=#{s[1]} vn=#{vn} to #{nodes[0]}")
726     res = send_cmd(nodes[0],"f#{fnc} #{s[1]} #{d} #{s[2]}\r\n")
727     if res == nil || res.start_with?("ERROR")
728       return send_data("SERVER_ERROR Message forward failed.\r\n")
729     end
730     return send_data("#{res}\r\n")
731   end
732 
733   store_incr_decr(fnc, hname, vn, key, d, v, nodes[1..-1])
734 end
redundant(nodes, hname, k, d, clk, expt, v) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
613 def redundant(nodes, hname, k, d, clk, expt, v)
614   if @stats.size_of_zredundant > 0 && @stats.size_of_zredundant < v.length 
615     return zredundant(nodes, hname, k, d, clk, expt, v)
616   end
617 
618   nodes.each{ |nid|
619     res = send_cmd(nid,"rset #{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}\r\n#{v}\r\n")
620     if res == nil || res.start_with?("ERROR")
621       Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('redundant',[nid,hname,k,d,clk,expt,v]))
622       @log.warn("redundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}")
623     end
624   }
625 end
set(fnc,s) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
639 def set(fnc,s)
640   if s.length != 5
641     @log.error("set:wrong number of arguments(#{s})")
642     return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
643   end
644 
645   bytes = s[4].to_i
646   if bytes < 0
647     @log.error("set:wrong value size(#{s})")
648     return send_data("CLIENT_ERROR Wrong value size.\r\n")
649   end
650 
651   key,hname = s[1].split("\e")
652   hname ||= @defhash
653   d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
654   v = read_bytes(bytes)
655   read_bytes(2)
656   vn = @rttable.get_vnode_id(d)
657   nodes = @rttable.search_nodes_for_write(vn)
658   if nodes[0] != @nid
659     @log.warn("forward #{fnc} key=#{key} vn=#{vn} to #{nodes[0]}")
660     res = send_cmd(nodes[0],"f#{fnc} #{s[1]} #{d} #{s[3]} #{s[4]}\r\n#{v}\r\n")
661     if res == nil || res.start_with?("ERROR")
662       return send_data("SERVER_ERROR Message forward failed.\r\n")
663     end
664     return send_data("#{res}\r\n")
665   end
666 
667   store(fnc, hname, vn, key, d, s[3].to_i, v, nodes[1..-1])
668 end
store(fnc, hname, vn, k, d, expt, v, nodes) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
545 def store(fnc, hname, vn, k, d, expt, v, nodes)
546   expt = chg_time_expt(expt)
547   unless @storages.key?(hname)
548     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
549     return
550   end
551 
552   if @stats.wb_command_map.key?("#{fnc.to_s}__prev".to_sym)
553     data = @storages[hname].get(vn, k, d)
554     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map["#{fnc.to_s}__prev".to_sym], k, data) if data
555   end
556 
557   ret = @storages[hname].send(fnc, vn, k, d, expt ,v)
558   @stats.write_count += 1
559 
560   if ret
561     if @stats.wb_command_map.key?(fnc)
562       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[fnc], k, ret[4])
563     end
564     redundant(nodes, hname, k, d, ret[2], expt, ret[4])
565     if $roma.cr_writer.run_replication
566       k = "#{k}\e#{hname}" if hname != @defhash
567       Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} 1 #{expt} #{v.length} \r\n#{v}\r\n", k, v)
568     end
569     send_data("STORED\r\n")
570   else
571     @log.error("#{fnc} NOT_STORED:#{hname} #{vn} #{k} #{d} #{expt}")
572     send_data("NOT_STORED\r\n")
573   end
574 end
store_cas(hname, vn, k, d, clk, expt, v, nodes) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
576 def store_cas(hname, vn, k, d, clk, expt, v, nodes)
577   expt = chg_time_expt(expt)
578   unless @storages.key?(hname)
579     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
580     return
581   end
582 
583   if @stats.wb_command_map.key?(:cas__prev)
584     data = @storages[hname].get(vn, k, d)
585     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:cas__prev], k, data) if data
586   end
587 
588   ret = @storages[hname].cas(vn, k, d, clk, expt ,v)
589   @stats.write_count += 1
590 
591   case ret
592   when nil
593     @log.error("cas NOT_STORED:#{hname} #{vn} #{k} #{d} #{expt} #{clk}")
594     send_data("NOT_STORED\r\n")
595   when :not_found
596     send_data("NOT_FOUND\r\n")
597   when :exists
598     send_data("EXISTS\r\n")
599   else
600     if @stats.wb_command_map.key?(:cas)
601       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[:cas], k, ret[4])
602     end
603     if $roma.cr_writer.run_replication
604       k = "#{k}\e#{hname}" if hname != @defhash
605       fnc = 'set' # To restrain a defference between main and replica cluster due to clk
606       Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} 0 #{expt} #{v.length} \r\n#{v}\r\n", k, v)
607     end
608     redundant(nodes, hname, k, d, ret[2], expt, ret[4])
609     send_data("STORED\r\n")          
610   end
611 end
store_incr_decr(fnc, hname, vn, k, d, v, nodes) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
688 def store_incr_decr(fnc, hname, vn, k, d, v, nodes)
689   unless @storages.key?(hname)
690     send_data("SERVER_ERROR #{hname} does not exists.\r\n")
691     return
692   end
693 
694   if @stats.wb_command_map.key?("#{fnc.to_s}__prev".to_sym)
695     data = @storages[hname].get(vn, k, d)
696     Roma::WriteBehindProcess::push(hname, @stats.wb_command_map["#{fnc.to_s}__prev".to_sym], k, data) if data
697   end
698 
699   res = @storages[hname].send(fnc, vn, k, d, v)
700   @stats.write_count += 1
701 
702   if res
703     if @stats.wb_command_map.key?(fnc)
704       Roma::WriteBehindProcess::push(hname, @stats.wb_command_map[fnc], k, res[4])
705     end
706     if $roma.cr_writer.run_replication
707       k = "#{k}\e#{hname}" if hname != @defhash
708       Roma::WriteBehindProcess::push(nil, "#{fnc} #{k} #{v}\r\n", k, v)
709     end
710     redundant(nodes, hname, k, d, res[2], res[3], res[4])
711     send_data("#{res[4]}\r\n")
712   else
713     send_data("NOT_FOUND\r\n")
714   end
715 end
zredundant(nodes, hname, k, d, clk, expt, v) click to toggle source
    # File lib/roma/plugin/plugin_storage.rb
627 def zredundant(nodes, hname, k, d, clk, expt, v)
628   zv = Zlib::Deflate.deflate(v)
629 
630   nodes.each{ |nid|
631     res = send_cmd(nid,"rzset #{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}\r\n#{zv}\r\n")
632     if res == nil || res.start_with?("ERROR")
633       Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('zredundant',[nid,hname,k,d,clk,expt,zv]))
634       @log.warn("zredundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length} -> #{nid}")
635     end
636   }
637 end