class StalkClimber::Connection
Constants
- DEFAULT_TUBE
Default tube used when no custom tube in use
- PROBE_TRANSMISSION
Transmission used to probe state of Beanstalkd. Created with lowest possible priority and delay to reduce possibility of interference.
Attributes
Last known maximum job id on the Beanstalkd server
Last known existing minimum job id on the Beanstalkd server
Tube
to use when probing the Beanstalkd server for information
Public Class Methods
Initializes a new Connection
to the beanstalk address
provided and configures the Connection
to only use the configured test_tube
for all transmissions. Optionally yields the instance if a block is given. The instance is yielded prior to test_tube
configuration to allow the test_tube
to be configured.
# File lib/stalk_climber/connection.rb, line 79 def initialize(address, test_tube = DEFAULT_TUBE) super(address) @test_tube = test_tube || DEFAULT_TUBE clear_job_cache yield(self) if block_given? use_test_tube end
Public Instance Methods
Returns or creates a Hash used for caching jobs by ID
# File lib/stalk_climber/connection.rb, line 27 def cached_jobs return @cached_jobs ||= {} end
Resets the job cache and reinitializes the min and max climbed job ids
# File lib/stalk_climber/connection.rb, line 33 def clear_job_cache @cached_jobs = nil @min_climbed_job_id = Float::INFINITY @max_climbed_job_id = 0 return true end
Safe form of fetch_job
!, returns a Job
instance for the specified job_id
. If the job does not exist, the error is caught and nil is passed returned instead.
# File lib/stalk_climber/connection.rb, line 43 def fetch_job(job_id) begin job = fetch_job!(job_id) rescue Beaneater::NotFoundError job = nil end return job end
Returns a Job
instance for the specified job_id
. If the job does not exist, a Beaneater::NotFoundError will bubble up from Beaneater. The job is not cached.
# File lib/stalk_climber/connection.rb, line 55 def fetch_job!(job_id) return Job.new(transmit("peek #{job_id}")) end
Like fetch_job
, but fetches all job ids in job_ids
. Jobs are not cached and nil is returned if any of the jobs don't exist.
# File lib/stalk_climber/connection.rb, line 62 def fetch_jobs(*job_ids) return job_ids.flatten.map { |job_id| fetch_job(job_id) } end
Similar to fetch_job
!, but fetches all job ids in job_ids
. Jobs are not cached and a Beaneater::NotFoundError is raised if any of the listed jobs don't exist.
# File lib/stalk_climber/connection.rb, line 69 def fetch_jobs!(*job_ids) return job_ids.flatten.map { |job_id| fetch_job!(job_id) } end
Returns an Enumerator for crawling all existing jobs for a connection. See Connection#each_job for more information.
# File lib/stalk_climber/connection.rb, line 110 def job_enumerator return Enumerator.new do |yielder| max_id = max_job_id initial_cached_jobs = cached_jobs.values_at(*cached_jobs.keys.sort.reverse) max_id.downto(self.max_climbed_job_id + 1) do |job_id| job = fetch_and_cache_job(job_id) yielder << job unless job.nil? end initial_cached_jobs.each do |job| if job.exists? yielder << job else self.cached_jobs.delete(job.id) end end ([self.min_climbed_job_id - 1, max_id].min).downto(1) do |job_id| job = fetch_and_cache_job(job_id) yielder << job unless job.nil? end nil end end
Determintes the max job ID of the connection by inserting a job into the test tube and immediately deleting it. Before returning the max ID, the max ID is used to update the max_climbed_job_id
(if sequentual) and possibly invalidate the job cache. The cache will be invalidated if the max ID is less than any known IDs since new job IDs should always increment unless there's been a change in server state.
# File lib/stalk_climber/connection.rb, line 93 def max_job_id job = Job.new(transmit(PROBE_TRANSMISSION)) job.delete update_climbed_job_ids_from_max_id(job.id) return job.id end
Set and use the provided test_tube
# File lib/stalk_climber/connection.rb, line 102 def test_tube=(test_tube) @test_tube = test_tube use_test_tube end
Protected Instance Methods
Helper method, similar to fetch_job
, that retrieves the job identified by job_id
, caches it, and updates counters before returning the job. If the job does not exist, nothing is cached, however counters will be updated, and nil is returned
# File lib/stalk_climber/connection.rb, line 144 def fetch_and_cache_job(job_id) job = fetch_job(job_id) self.cached_jobs[job_id] = job unless job.nil? @min_climbed_job_id = job_id if job_id < @min_climbed_job_id @max_climbed_job_id = job_id if job_id > @max_climbed_job_id return job end
Uses new_max_id
to update the max_climbed_job_id
(if sequentual) and possibly invalidate the job cache. The job cache will be invalidated if new_max_id
is less than any known IDs since new job IDs should always increment unless there's been a change in server state.
# File lib/stalk_climber/connection.rb, line 156 def update_climbed_job_ids_from_max_id(new_max_id) if @max_climbed_job_id > 0 && @max_climbed_job_id == new_max_id - 1 @max_climbed_job_id = new_max_id elsif new_max_id < @max_climbed_job_id clear_job_cache end end
Dispatch transmissions notifying Beanstalk to use the configured test_tube
for all commands from this connection and to ignore the default tube
# File lib/stalk_climber/connection.rb, line 167 def use_test_tube [ "use #{self.test_tube}", "watch #{self.test_tube}", 'ignore default', ].each do |transmission| transmit(transmission) end end