class Fluent::Counter::MutexHash
Public Class Methods
new(data_store)
click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 22 def initialize(data_store) @mutex = Mutex.new @data_store = data_store @mutex_hash = {} @thread = nil @cleanup_thread = CleanupThread.new(@data_store, @mutex_hash, @mutex) end
Public Instance Methods
start()
click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 30 def start @data_store.start @cleanup_thread.start end
stop()
click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 35 def stop @data_store.stop @cleanup_thread.stop end
synchronize(*keys) { |data_store, k| ... }
click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 40 def synchronize(*keys) return if keys.empty? locks = {} loop do @mutex.synchronize do keys.each do |key| mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock locks[key] = mutex else locks.each_value(&:unlock) locks = {} # flush locked keys break end end end next if locks.empty? # failed to lock all keys locks.each do |(k, v)| yield @data_store, k v.unlock end break end end
synchronize_keys(*keys) { |data_store, key| ... }
click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 74 def synchronize_keys(*keys) return if keys.empty? keys = keys.dup while key = keys.shift @mutex.lock mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock @mutex.unlock yield @data_store, key mutex.unlock else # release global lock @mutex.unlock keys.push(key) # failed lock, retry this key end end end