class RedisLocks::Semaphore
Constants
- NAMESPACE
- SETUP_DIGEST
- SETUP_SCRIPT
Removes stale locks, then ensures that all resources which aren’t locked are marked as available.
Public Class Methods
‘resources` is the number of clients allowed to lock the semaphore concurrently.
‘stale_client_timeout` is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate its lock.
# File lib/redis_locks/semaphore.rb, line 65 def initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) @key = key @resource_count = resources.to_i @stale_client_timeout = stale_client_timeout.to_f @redis = Connections.ensure_pool(redis) @tokens = [] raise ArgumentError.new("Lock key is required") if @key.nil? || @key.empty? raise ArgumentError.new("resources must be > 0") unless @resource_count > 0 raise ArgumentError.new("stale_client_timeout must be > 0") unless @stale_client_timeout > 0 end
Public Instance Methods
Forcefully clear the lock. Be careful!
# File lib/redis_locks/semaphore.rb, line 78 def delete! @redis.with do |conn| conn.del(available_key) conn.del(grabbed_key) end @tokens = [] end
Acquire a resource from the semaphore, if available. Returns false if no resources are available.
‘timeout` is how long to wait, blocking, until a resource is available. The default is nil, meaning don’t block. A timeout of zero means block forever. (This is a bit weird, but corresponds to how blpop uses timeout values.)
If passed a block, if a resource is available, runs the block and then unlocks.
If called without a block, if a resource is available, returns a token. Caller is then responsible for unlocking the token.
This isn’t atomic–if the process dies, we could remove something from the available queue without adding it to the grabbed set–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.
# File lib/redis_locks/semaphore.rb, line 104 def lock(timeout: nil, &block) ensure_exists_and_release_stale_locks! success = @redis.with do |conn| if timeout !conn.blpop(available_key, timeout.to_i).nil? else !conn.lpop(available_key).nil? end end return false unless success token = SecureRandom.hex(16) @tokens.push(token) @redis.with do |conn| conn.zadd(grabbed_key, epoch_f(conn), token) end return_or_yield(token, &block) end
# File lib/redis_locks/semaphore.rb, line 130 def lock!(timeout: nil, &block) token = lock(timeout: timeout) raise SemaphoreUnavailable.new(@key, @resource_count) unless token return_or_yield(token, &block) end
Release a resource back to the semaphore. Should normally be called with an explicit token.
This isn’t atomic–if the process dies, we could remove something from the blocked set without adding it to the available queue–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.
# File lib/redis_locks/semaphore.rb, line 147 def unlock(token = @tokens.pop) return unless token removed = false @redis.with do |conn| removed = conn.zrem grabbed_key, token if removed conn.lpush available_key, 1 end end removed end
# File lib/redis_locks/semaphore.rb, line 126 def wait(timeout: 0, &block) lock(timeout: timeout, &block) end
# File lib/redis_locks/semaphore.rb, line 136 def wait!(timeout: 0, &block) lock!(timeout: timeout, &block) end
Private Instance Methods
# File lib/redis_locks/semaphore.rb, line 193 def available_key @available_key ||= namespaced_key('AVAILABLE') end
# File lib/redis_locks/semaphore.rb, line 177 def ensure_exists_and_release_stale_locks! @redis.with do |conn| RedisLocks.evalsha_or_eval( conn: conn, script: SETUP_SCRIPT, digest: SETUP_DIGEST, keys: [available_key, grabbed_key], args: [@resource_count, stale_before(conn)] ) end end
# File lib/redis_locks/semaphore.rb, line 205 def epoch_f(conn) epoch_i, microseconds = conn.time epoch_i + microseconds.to_f / 1_000_000 end
# File lib/redis_locks/semaphore.rb, line 197 def grabbed_key @grabbed_key ||= namespaced_key('GRABBED') end
# File lib/redis_locks/semaphore.rb, line 189 def namespaced_key(variable) "#{NAMESPACE}:#{@key}:#{variable}" end
# File lib/redis_locks/semaphore.rb, line 165 def return_or_yield(token) return_value = token if block_given? begin return_value = yield token ensure unlock(token) end end return_value end
# File lib/redis_locks/semaphore.rb, line 201 def stale_before(conn) epoch_f(conn) - @stale_client_timeout end