class RedisLocks::Semaphore

Constants

NAMESPACE
SETUP_DIGEST
SETUP_SCRIPT

Removes stale locks, then ensures that all resources which aren’t locked are marked as available.

Public Class Methods

new(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) click to toggle source

‘resources` is the number of clients allowed to lock the semaphore concurrently.

‘stale_client_timeout` is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate its lock.

# File lib/redis_locks/semaphore.rb, line 65
def initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis)
  @key = key
  @resource_count = resources.to_i
  @stale_client_timeout = stale_client_timeout.to_f
  @redis = Connections.ensure_pool(redis)
  @tokens = []

  raise ArgumentError.new("Lock key is required") if @key.nil? || @key.empty?
  raise ArgumentError.new("resources must be > 0") unless @resource_count > 0
  raise ArgumentError.new("stale_client_timeout must be > 0") unless @stale_client_timeout > 0
end

Public Instance Methods

delete!() click to toggle source

Forcefully clear the lock. Be careful!

# File lib/redis_locks/semaphore.rb, line 78
def delete!
  @redis.with do |conn|
    conn.del(available_key)
    conn.del(grabbed_key)
  end

  @tokens = []
end
lock(timeout: nil, &block) click to toggle source

Acquire a resource from the semaphore, if available. Returns false if no resources are available.

‘timeout` is how long to wait, blocking, until a resource is available. The default is nil, meaning don’t block. A timeout of zero means block forever. (This is a bit weird, but corresponds to how blpop uses timeout values.)

If passed a block, if a resource is available, runs the block and then unlocks.

If called without a block, if a resource is available, returns a token. Caller is then responsible for unlocking the token.

This isn’t atomic–if the process dies, we could remove something from the available queue without adding it to the grabbed set–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.

# File lib/redis_locks/semaphore.rb, line 104
def lock(timeout: nil, &block)
  ensure_exists_and_release_stale_locks!

  success = @redis.with do |conn|
    if timeout
      !conn.blpop(available_key, timeout.to_i).nil?
    else
      !conn.lpop(available_key).nil?
    end
  end

  return false unless success

  token = SecureRandom.hex(16)
  @tokens.push(token)
  @redis.with do |conn|
    conn.zadd(grabbed_key, epoch_f(conn), token)
  end

  return_or_yield(token, &block)
end
lock!(timeout: nil, &block) click to toggle source
# File lib/redis_locks/semaphore.rb, line 130
def lock!(timeout: nil, &block)
  token = lock(timeout: timeout)
  raise SemaphoreUnavailable.new(@key, @resource_count) unless token
  return_or_yield(token, &block)
end
signal(token = @tokens.pop)
Alias for: unlock
unlock(token = @tokens.pop) click to toggle source

Release a resource back to the semaphore. Should normally be called with an explicit token.

This isn’t atomic–if the process dies, we could remove something from the blocked set without adding it to the available queue–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.

# File lib/redis_locks/semaphore.rb, line 147
def unlock(token = @tokens.pop)
  return unless token

  removed = false

  @redis.with do |conn|
    removed = conn.zrem grabbed_key, token
    if removed
      conn.lpush available_key, 1
    end
  end

  removed
end
Also aliased as: signal
wait(timeout: 0, &block) click to toggle source
# File lib/redis_locks/semaphore.rb, line 126
def wait(timeout: 0, &block)
  lock(timeout: timeout, &block)
end
wait!(timeout: 0, &block) click to toggle source
# File lib/redis_locks/semaphore.rb, line 136
def wait!(timeout: 0, &block)
  lock!(timeout: timeout, &block)
end

Private Instance Methods

available_key() click to toggle source
# File lib/redis_locks/semaphore.rb, line 193
def available_key
  @available_key ||= namespaced_key('AVAILABLE')
end
ensure_exists_and_release_stale_locks!() click to toggle source
# File lib/redis_locks/semaphore.rb, line 177
def ensure_exists_and_release_stale_locks!
  @redis.with do |conn|
    RedisLocks.evalsha_or_eval(
      conn: conn,
      script: SETUP_SCRIPT,
      digest: SETUP_DIGEST,
      keys: [available_key, grabbed_key],
      args: [@resource_count, stale_before(conn)]
    )
  end
end
epoch_f(conn) click to toggle source
# File lib/redis_locks/semaphore.rb, line 205
def epoch_f(conn)
  epoch_i, microseconds = conn.time
  epoch_i + microseconds.to_f / 1_000_000
end
grabbed_key() click to toggle source
# File lib/redis_locks/semaphore.rb, line 197
def grabbed_key
  @grabbed_key ||= namespaced_key('GRABBED')
end
namespaced_key(variable) click to toggle source
# File lib/redis_locks/semaphore.rb, line 189
def namespaced_key(variable)
  "#{NAMESPACE}:#{@key}:#{variable}"
end
return_or_yield(token) { |token| ... } click to toggle source
# File lib/redis_locks/semaphore.rb, line 165
def return_or_yield(token)
  return_value = token
  if block_given?
    begin
      return_value = yield token
    ensure
      unlock(token)
    end
  end
  return_value
end
stale_before(conn) click to toggle source
# File lib/redis_locks/semaphore.rb, line 201
def stale_before(conn)
  epoch_f(conn) - @stale_client_timeout
end