class Roma::WriteBehind::StreamWriter

Attributes

replica_mklhash[RW]
replica_nodelist[RW]
replica_rttable[RW]
run_existing_data_replication[RW]
run_replication[RW]

Public Class Methods

new(log) click to toggle source
    # File lib/roma/write_behind.rb
136 def initialize(log)
137   @log = log
138   @run_replication = false
139   @run_existing_data_replication = false
140   @replica_mklhash = nil
141   @replica_nodelist = []
142   @replica_rttable = nil
143   @do_transmit = false
144 end

Public Instance Methods

change_mklhash?() click to toggle source
    # File lib/roma/write_behind.rb
155 def change_mklhash?
156   con = Roma::Messaging::ConPool.instance.get_connection(@replica_nodelist[0])
157   con.write("mklhash 0\r\n")
158   current_mklhash = con.gets.chomp
159   Roma::Messaging::ConPool.instance.return_connection(@replica_nodelist[0], con)
160 
161   if current_mklhash == @replica_mklhash
162     return false
163   else
164     return true
165   end
166 rescue
167   @replica_nodelist.shift
168   if @replica_nodelist.length == 0
169     @run_replication = false
170     @log.error("Replicate Cluster was down.")
171   else
172     retry
173   end
174 end
close_all() click to toggle source
    # File lib/roma/write_behind.rb
252 def close_all
253   @replica_nodelist.each{|nid|
254     Roma::Messaging::ConPool.instance.close_at(nid)
255   }
256 end
get_stat() click to toggle source
    # File lib/roma/write_behind.rb
146 def get_stat
147   ret = {}
148   ret['write-behind.run_replication'] = @run_replication
149   ret['write-behind.run_existing_data_replication'] = @run_existing_data_replication
150   ret['write-behind.replica_mklhash'] = @replica_mklhash
151   ret['write-behind.replica_nodelist'] = @replica_nodelist
152   ret
153 end
search_replica_primary_node(key) click to toggle source
    # File lib/roma/write_behind.rb
227 def search_replica_primary_node(key)
228   d = Digest::SHA1.hexdigest(key).hex % (2**@replica_rttable.dgst_bits)
229   nodes = @replica_rttable.v_idx[d & @replica_rttable.search_mask]
230   return nodes[0] # for send primary node of replica cluster
231 rescue => e
232   @log.error("#{e}\n#{$@}")
233   nil
234 end
transmit(cmd, key, value) click to toggle source
    # File lib/roma/write_behind.rb
236 def transmit(cmd, key, value) # value is for error log
237   timeout(5) do
238     @do_transmit = true
239     nid = search_replica_primary_node(key)
240     con = Roma::Messaging::ConPool.instance.get_connection(nid)
241     con.write(cmd)
242     con.gets # for return connection
243     Roma::Messaging::ConPool.instance.return_connection(nid, con)
244   end
245 rescue => e
246   @log.error("#{e}\n#{$@}")
247   @log.error("replication error: key=#{key} value=#{value}\r\n")
248 ensure
249   @do_transmit = false
250 end
update_mklhash(nid) click to toggle source
    # File lib/roma/write_behind.rb
176 def update_mklhash(nid)
177   timeout(1) do
178     con = Roma::Messaging::ConPool.instance.get_connection(nid)
179     con.write("mklhash 0\r\n")
180     @replica_mklhash = con.gets.chomp
181     Roma::Messaging::ConPool.instance.return_connection(nid, con)
182     @log.debug("replica_mklhash has updated: [#{@replica_mklhash}]")
183   end
184 rescue => e
185   @log.error("#{e}\n#{$@}")
186 end
update_nodelist(nid) click to toggle source
    # File lib/roma/write_behind.rb
188 def update_nodelist(nid)
189   timeout(1) do
190     con = Roma::Messaging::ConPool.instance.get_connection(nid)
191     con.write("nodelist\r\n")
192     @replica_nodelist = con.gets.chomp.split("\s")
193     Roma::Messaging::ConPool.instance.return_connection(nid, con)
194     @log.debug("replica_nodelist has updated: #{@replica_nodelist}")
195   end
196 rescue => e
197   @log.error("#{e}\n#{$@}")
198 end
update_rttable(nid) click to toggle source
    # File lib/roma/write_behind.rb
200 def update_rttable(nid)
201   timeout(1) do
202     con = Roma::Messaging::ConPool.instance.get_connection(nid)
203     con.write "routingdump\r\n"
204     routes_length = con.gets.to_i
205     if (routes_length <= 0)
206       con.close
207       @log.error("#{__method__} process was failed.\r\n") if routes_length < 0
208       return nil
209     end
210 
211     routes = ''
212     while (routes.length != routes_length)
213       routes = routes + con.read(routes_length - routes.length)
214     end
215     con.read(2) # "\r\n"
216     con.gets
217     rd = Marshal.load(routes)
218     Roma::Messaging::ConPool.instance.return_connection(nid, con)
219     @replica_rttable = rd
220     @log.debug("replica_rttable has updated: [#{@replica_rttable}]")
221   end
222 rescue => e
223   @log.error("#{e}\n#{$@}")
224   nil
225 end