module Roma::Command::Definition::ClassMethods

Constants

CommandContext
CommandParams
StoredData

Public Instance Methods

def_command_with_key(cmd, forward = :one_line, &block) click to toggle source
    # File lib/roma/command/command_definition.rb
168 def def_command_with_key(cmd, forward = :one_line, &block)
169   define_method "ev_#{cmd}" do |s|
170     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
171     begin
172       params = CommandParams.new
173       params.key, params.hash_name = s[1].split("\e")
174       params.hash_name ||= @defhash
175       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
176       params.vn = @rttable.get_vnode_id(params.digest)
177       params.nodes = @rttable.search_nodes_for_write(params.vn)
178       if params.nodes[0] != @nid
179         if forward == :one_line
180           return forward_and_one_line_receive(params.nodes[0], s)
181         elsif  forward == :multi_line
182           return forward_and_multi_line_receive(params.nodes[0], s)
183         end
184       end
185       stored = StoredData.new
186       unless @storages[params.hash_name]
187         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
188       end
189 
190       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
191         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
192       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
193       ctx = CommandContext.new(s, params, stored)
194       instance_exec(ctx, &block)
195     rescue ClientErrorException => e
196       send_data("CLIENT_ERROR #{e.message}\r\n")
197     rescue ServerErrorException => e
198       send_data("SERVER_ERROR #{e.message}\r\n")
199     rescue LocalJumpError => e
200       @log.warn("#{e} #{$@}")
201       e.exit_value
202     end
203   end
204 end
def_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) click to toggle source
    # File lib/roma/command/command_definition.rb
323 def def_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
324   define_method "ev_#{cmd}" do |s|
325     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
326     begin
327       params = CommandParams.new
328       params.key, params.hash_name = s[1].split("\e")
329       params.hash_name ||= @defhash
330       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
331       params.vn = @rttable.get_vnode_id(params.digest)
332       params.nodes = @rttable.search_nodes_for_write(params.vn)
333       params.value = read_bytes(s[idx_of_val_len].to_i)
334       read_bytes(2)
335       if params.nodes[0] != @nid
336         if forward == :one_line
337           return forward_and_one_line_receive(params.nodes[0], s, params.value)
338         elsif  forward == :multi_line
339           return forward_and_multi_line_receive(params.nodes[0], s, params.value)
340         end
341       end
342       stored = StoredData.new
343       unless @storages[params.hash_name]
344         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
345       end
346 
347       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
348         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
349       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
350       ctx = CommandContext.new(s, params, stored)
351       instance_exec(ctx, &block)
352     rescue ClientErrorException => e
353       send_data("CLIENT_ERROR #{e.message}\r\n")
354     rescue ServerErrorException => e
355       send_data("SERVER_ERROR #{e.message}\r\n")
356     rescue LocalJumpError => e
357       @log.warn("#{e} #{$@}")
358       e.exit_value
359     end
360   end
361 end
def_command_with_relay(cmd, &block) click to toggle source
   # File lib/roma/command/command_definition.rb
15 def def_command_with_relay(cmd, &block)
16   # check a duplicated command definition in a same scope
17   if public_method_defined? "ev_#{cmd}".to_sym
18     raise "ev_#{cmd} already defined."
19   end
20 
21   # define a command receiver
22   define_method "ev_#{cmd}" do |s|
23     begin
24       res = {}
25       res[@stats.ap_str] = instance_exec(s, &block)
26       # command relay
27       res.merge! broadcast_cmd("r#{cmd} #{s[1..-1].join(' ')}\r\n")
28       send_data("#{res}\r\n")
29     rescue ClientErrorException => e
30       send_data("CLIENT_ERROR #{e.message}\r\n")
31     rescue ServerErrorException => e
32       send_data("SERVER_ERROR #{e.message}\r\n")
33     rescue LocalJumpError => e
34       @log.warn("#{e} #{$@}")
35       e.exit_value
36     end
37   end
38 
39   # define a relaid command receiver
40   define_method "ev_r#{cmd}" do |s|
41     begin
42       send_data("#{instance_exec(s, &block)}\r\n")
43     rescue ClientErrorException => e
44       send_data("CLIENT_ERROR #{e.message}\r\n")
45     rescue ServerErrorException => e
46       send_data("SERVER_ERROR #{e.message}\r\n")
47     rescue LocalJumpError => e
48       @log.warn("#{e} #{$@}")
49       e.exit_value
50     end
51   end
52 end
def_read_command_with_key(cmd, forward = :one_line, &block) click to toggle source
   # File lib/roma/command/command_definition.rb
58 def def_read_command_with_key(cmd, forward = :one_line, &block)
59   define_method "ev_#{cmd}" do |s|
60     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
61     begin
62       params = CommandParams.new
63       params.key, params.hash_name = s[1].split("\e")
64       params.hash_name ||= @defhash
65       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
66       params.vn = @rttable.get_vnode_id(params.digest)
67       params.nodes = @rttable.search_nodes_for_write(params.vn)
68       if params.nodes[0] != @nid
69         if forward == :one_line
70           return forward_and_one_line_receive(params.nodes[0], s)
71         elsif  forward == :multi_line
72           return forward_and_multi_line_receive(params.nodes[0], s)
73         end
74       end
75       stored = StoredData.new
76       unless @storages[params.hash_name]
77         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
78       end
79 
80       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
81         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
82       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
83       ctx = CommandContext.new(s, params, stored)
84       instance_exec(ctx, &block)
85       @stats.read_count += 1
86     rescue ClientErrorException => e
87       send_data("CLIENT_ERROR #{e.message}\r\n")
88     rescue ServerErrorException => e
89       send_data("SERVER_ERROR #{e.message}\r\n")
90     rescue LocalJumpError => e
91       @log.warn("#{e} #{$@}")
92       e.exit_value
93     end
94   end
95 end
def_read_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) click to toggle source
    # File lib/roma/command/command_definition.rb
281 def def_read_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
282   define_method "ev_#{cmd}" do |s|
283     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
284     begin
285       params = CommandParams.new
286       params.key, params.hash_name = s[1].split("\e")
287       params.hash_name ||= @defhash
288       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
289       params.vn = @rttable.get_vnode_id(params.digest)
290       params.nodes = @rttable.search_nodes_for_write(params.vn)
291       params.value = read_bytes(s[idx_of_val_len].to_i)
292       read_bytes(2)
293       if params.nodes[0] != @nid
294         if forward == :one_line
295           return forward_and_one_line_receive(params.nodes[0], s, params.value)
296         elsif  forward == :multi_line
297           return forward_and_multi_line_receive(params.nodes[0], s, params.value)
298         end
299       end
300       stored = StoredData.new
301       unless @storages[params.hash_name]
302         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
303       end
304 
305       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
306         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
307       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
308       ctx = CommandContext.new(s, params, stored)
309 
310       instance_exec(ctx, &block)
311       @stats.read_count += 1
312     rescue ClientErrorException => e
313       send_data("CLIENT_ERROR #{e.message}\r\n")
314     rescue ServerErrorException => e
315       send_data("SERVER_ERROR #{e.message}\r\n")
316     rescue LocalJumpError => e
317       @log.warn("#{e} #{$@}")
318       e.exit_value
319     end
320   end
321 end
def_write_command_with_key(cmd, forward = :one_line, &block) click to toggle source
    # File lib/roma/command/command_definition.rb
 97 def def_write_command_with_key(cmd, forward = :one_line, &block)
 98   define_method "ev_#{cmd}" do |s|
 99     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
100     begin
101       params = CommandParams.new
102       params.key, params.hash_name = s[1].split("\e")
103       params.hash_name ||= @defhash
104       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
105       params.vn = @rttable.get_vnode_id(params.digest)
106       params.nodes = @rttable.search_nodes_for_write(params.vn)
107       if params.nodes[0] != @nid
108         if forward == :one_line
109           return forward_and_one_line_receive(params.nodes[0], s)
110         elsif  forward == :multi_line
111           return forward_and_multi_line_receive(params.nodes[0], s)
112         end
113       end
114       stored = StoredData.new
115       unless @storages[params.hash_name]
116         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
117       end
118 
119       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
120         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
121       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
122       if stored && @stats.wb_command_map.key?("#{cmd}__prev".to_sym)
123         Roma::WriteBehindProcess::push(params.hash_name,
124                                        @stats.wb_command_map["#{cmd}__prev".to_sym],
125                                        params.key, stored.value)
126       end
127       ctx = CommandContext.new(s, params, stored)
128 
129       ret = instance_exec(ctx, &block)
130       if ret.instance_of? Array
131         flg, expt, value, count, msg = ret
132         ret = @storages[ctx.params.hash_name].set(ctx.params.vn, 
133                                                   ctx.params.key,
134                                                   ctx.params.digest,
135                                                   expt,
136                                                   value)
137         if count == :write
138           @stats.write_count += 1
139         elsif count == :delete
140           @stats.delete_count += 1
141         end
142 
143         if ret
144           if @stats.wb_command_map.key?(cmd.to_sym)
145             Roma::WriteBehindProcess::push(ctx.params.hash_name,
146                                            @stats.wb_command_map[cmd.to_sym],
147                                            ctx.params.key, ret[4])
148           end
149           redundant(ctx.params.nodes[1..-1], ctx.params.hash_name,
150                     ctx.params.key, ctx.params.digest, ret[2],
151                     expt, ret[4])
152           send_data("#{msg}\r\n")
153         else
154           send_data("NOT_#{msg}\r\n")
155         end
156       end
157     rescue ClientErrorException => e
158       send_data("CLIENT_ERROR #{e.message}\r\n")
159     rescue ServerErrorException => e
160       send_data("SERVER_ERROR #{e.message}\r\n")
161     rescue LocalJumpError => e
162       @log.warn("#{e} #{$@}")
163       e.exit_value
164     end
165   end
166 end
def_write_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block) click to toggle source
    # File lib/roma/command/command_definition.rb
206 def def_write_command_with_key_value(cmd, idx_of_val_len, forward = :one_line, &block)
207   define_method "ev_#{cmd}" do |s|
208     return send_data("CLIENT_ERROR does not find key\r\n") if s.length < 2
209     begin
210       params = CommandParams.new
211       params.key, params.hash_name = s[1].split("\e")
212       params.hash_name ||= @defhash
213       params.digest = Digest::SHA1.hexdigest(params.key).hex % @rttable.hbits
214       params.vn = @rttable.get_vnode_id(params.digest)
215       params.nodes = @rttable.search_nodes_for_write(params.vn)
216       params.value = read_bytes(s[idx_of_val_len].to_i)
217       read_bytes(2)
218       if params.nodes[0] != @nid
219         if forward == :one_line
220           return forward_and_one_line_receive(params.nodes[0], s, params.value)
221         elsif  forward == :multi_line
222           return forward_and_multi_line_receive(params.nodes[0], s, params.value)
223         end
224       end
225       stored = StoredData.new
226       unless @storages[params.hash_name]
227         return send_data("SERVER_ERROR #{params.hash_name} does not exists.\r\n")
228       end
229 
230       stored.vn, stored.last, stored.clk, stored.expt, stored.value =
231         @storages[params.hash_name].get_raw(params.vn, params.key, params.digest)
232       stored = nil if stored.vn == nil || Time.now.to_i > stored.expt
233 
234       if stored && @stats.wb_command_map.key?("#{cmd}__prev".to_sym)
235         Roma::WriteBehindProcess::push(params.hash_name,
236                                        @stats.wb_command_map["#{cmd}__prev".to_sym],
237                                        params.key, stored.value)
238       end
239 
240       ctx = CommandContext.new(s, params, stored)
241       
242       ret = instance_exec(ctx, &block)
243       if ret.instance_of? Array
244         flg, expt, value, count, msg = ret
245         ret = @storages[ctx.params.hash_name].set(ctx.params.vn, 
246                                                   ctx.params.key,
247                                                   ctx.params.digest,
248                                                   expt,
249                                                   value)
250         if count == :write
251           @stats.write_count += 1
252         elsif count == :delete
253           @stats.delete_count += 1
254         end
255 
256         if ret
257           if @stats.wb_command_map.key?(cmd.to_sym)
258             Roma::WriteBehindProcess::push(ctx.params.hash_name,
259                                            @stats.wb_command_map[cmd.to_sym],
260                                            ctx.params.key, ret[4])
261           end
262           redundant(ctx.params.nodes[1..-1], ctx.params.hash_name, 
263                     ctx.params.key, ctx.params.digest, ret[2], 
264                     expt, ret[4])
265           send_data("#{msg}\r\n")
266         else
267           send_data("NOT_#{msg}\r\n")
268         end
269       end
270     rescue ClientErrorException => e
271       send_data("CLIENT_ERROR #{e.message}\r\n")
272     rescue ServerErrorException => e
273       send_data("SERVER_ERROR #{e.message}\r\n")
274     rescue LocalJumpError => e
275       @log.warn("#{e} #{$@}")
276       e.exit_value
277     end
278   end
279 end