class Capacitor::CommandsFetcher

Public Instance Methods

block_on_incoming_signal_list() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 25
def block_on_incoming_signal_list
  false until incoming_signal_list
  flush_incoming_signal_list
end
blocking_timeout() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 9
def blocking_timeout
  60
end
fetch() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 5
def fetch
  new.retrieve_batch
end
flush_batch() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 72
def flush_batch
  # Safely processed now, kill the batch in redis
  redis.del "processing_hash", "retry_hash"
end
flush_incoming_signal_list() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 30
def flush_incoming_signal_list
  redis.del "incoming_signal_list"
end
flush_retried_batch() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 77
def flush_retried_batch
  if redis.hlen("retry_hash") > 0
    failure = 'failure:' + Time.new.utc.to_f.to_s
    redis.rename "retry_hash", failure
    redis.lpush "failed_hash_keys", failure
    logger.error "retry_hash moved to #{failure}"
  end
end
incoming_signal_list() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 21
def incoming_signal_list
  redis.blpop "incoming_signal_list", blocking_timeout
end
logger() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 17
def logger
  Capacitor.logger
end
redis() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 13
def redis
  Capacitor.redis
end
retrieve_batch() click to toggle source

When things are working well :incoming_hash -> :processing_hash -> flush()

If a batch fails once :processing_hash -> :retry_hash -> flush()

If a batch fails again :retry_hash -> :failed_hash_keys -> flush() then start over with :incoming_hash

# File lib/capacitor/commands_fetcher.rb, line 67
def retrieve_batch
  flush_retried_batch
  return retrieve_existing_batch || retrieve_current_batch
end
retrieve_current_batch() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 45
def retrieve_current_batch
  begin
    result = redis.rename "incoming_hash", "processing_hash"
  rescue Exception => e
    # This means we got a signal without getting data, which is
    # probably okay due to the harmless race condition, but might
    # warrant investigation later, so let's log it and move on.
    logger.warn "empty incoming_hash in retrieve_batch"
    return {}
  end
  redis.hgetall "processing_hash"
end
retrieve_existing_batch() click to toggle source
# File lib/capacitor/commands_fetcher.rb, line 34
def retrieve_existing_batch
  batch = redis.hgetall "processing_hash"
  if !batch.empty?
    redis.rename "processing_hash", "retry_hash"
    logger.error "processing_hash moved to retry_hash"
    batch
  else
    nil
  end
end