class Roma::WriteBehind::StreamWriter
Attributes
replica_mklhash[RW]
replica_nodelist[RW]
replica_rttable[RW]
run_existing_data_replication[RW]
run_replication[RW]
Public Class Methods
new(log)
click to toggle source
# File lib/roma/write_behind.rb 136 def initialize(log) 137 @log = log 138 @run_replication = false 139 @run_existing_data_replication = false 140 @replica_mklhash = nil 141 @replica_nodelist = [] 142 @replica_rttable = nil 143 @do_transmit = false 144 end
Public Instance Methods
change_mklhash?()
click to toggle source
# File lib/roma/write_behind.rb 155 def change_mklhash? 156 con = Roma::Messaging::ConPool.instance.get_connection(@replica_nodelist[0]) 157 con.write("mklhash 0\r\n") 158 current_mklhash = con.gets.chomp 159 Roma::Messaging::ConPool.instance.return_connection(@replica_nodelist[0], con) 160 161 if current_mklhash == @replica_mklhash 162 return false 163 else 164 return true 165 end 166 rescue 167 @replica_nodelist.shift 168 if @replica_nodelist.length == 0 169 @run_replication = false 170 @log.error("Replicate Cluster was down.") 171 else 172 retry 173 end 174 end
close_all()
click to toggle source
# File lib/roma/write_behind.rb 252 def close_all 253 @replica_nodelist.each{|nid| 254 Roma::Messaging::ConPool.instance.close_at(nid) 255 } 256 end
get_stat()
click to toggle source
# File lib/roma/write_behind.rb 146 def get_stat 147 ret = {} 148 ret['write-behind.run_replication'] = @run_replication 149 ret['write-behind.run_existing_data_replication'] = @run_existing_data_replication 150 ret['write-behind.replica_mklhash'] = @replica_mklhash 151 ret['write-behind.replica_nodelist'] = @replica_nodelist 152 ret 153 end
search_replica_primary_node(key)
click to toggle source
# File lib/roma/write_behind.rb 227 def search_replica_primary_node(key) 228 d = Digest::SHA1.hexdigest(key).hex % (2**@replica_rttable.dgst_bits) 229 nodes = @replica_rttable.v_idx[d & @replica_rttable.search_mask] 230 return nodes[0] # for send primary node of replica cluster 231 rescue => e 232 @log.error("#{e}\n#{$@}") 233 nil 234 end
transmit(cmd, key, value)
click to toggle source
# File lib/roma/write_behind.rb 236 def transmit(cmd, key, value) # value is for error log 237 timeout(5) do 238 @do_transmit = true 239 nid = search_replica_primary_node(key) 240 con = Roma::Messaging::ConPool.instance.get_connection(nid) 241 con.write(cmd) 242 con.gets # for return connection 243 Roma::Messaging::ConPool.instance.return_connection(nid, con) 244 end 245 rescue => e 246 @log.error("#{e}\n#{$@}") 247 @log.error("replication error: key=#{key} value=#{value}\r\n") 248 ensure 249 @do_transmit = false 250 end
update_mklhash(nid)
click to toggle source
# File lib/roma/write_behind.rb 176 def update_mklhash(nid) 177 timeout(1) do 178 con = Roma::Messaging::ConPool.instance.get_connection(nid) 179 con.write("mklhash 0\r\n") 180 @replica_mklhash = con.gets.chomp 181 Roma::Messaging::ConPool.instance.return_connection(nid, con) 182 @log.debug("replica_mklhash has updated: [#{@replica_mklhash}]") 183 end 184 rescue => e 185 @log.error("#{e}\n#{$@}") 186 end
update_nodelist(nid)
click to toggle source
# File lib/roma/write_behind.rb 188 def update_nodelist(nid) 189 timeout(1) do 190 con = Roma::Messaging::ConPool.instance.get_connection(nid) 191 con.write("nodelist\r\n") 192 @replica_nodelist = con.gets.chomp.split("\s") 193 Roma::Messaging::ConPool.instance.return_connection(nid, con) 194 @log.debug("replica_nodelist has updated: #{@replica_nodelist}") 195 end 196 rescue => e 197 @log.error("#{e}\n#{$@}") 198 end
update_rttable(nid)
click to toggle source
# File lib/roma/write_behind.rb 200 def update_rttable(nid) 201 timeout(1) do 202 con = Roma::Messaging::ConPool.instance.get_connection(nid) 203 con.write "routingdump\r\n" 204 routes_length = con.gets.to_i 205 if (routes_length <= 0) 206 con.close 207 @log.error("#{__method__} process was failed.\r\n") if routes_length < 0 208 return nil 209 end 210 211 routes = '' 212 while (routes.length != routes_length) 213 routes = routes + con.read(routes_length - routes.length) 214 end 215 con.read(2) # "\r\n" 216 con.gets 217 rd = Marshal.load(routes) 218 Roma::Messaging::ConPool.instance.return_connection(nid, con) 219 @replica_rttable = rd 220 @log.debug("replica_rttable has updated: [#{@replica_rttable}]") 221 end 222 rescue => e 223 @log.error("#{e}\n#{$@}") 224 nil 225 end