class NsqCluster

Attributes

nsqadmin[R]
nsqd[R]
nsqlookupd[R]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/nsq-cluster.rb, line 24
def initialize(opts = {})
  opts = {
    nsqlookupd_count: 0,
    nsqdlookupd_options: {},
    nsqd_count: 0,
    nsqadmin: false,
    nsqd_options: {},
    verbose: false
  }.merge(opts)

  @verbose = opts[:verbose]
  @nsqlookupd = create_nsqlookupds(opts[:nsqlookupd_count], opts[:nsqdlookupd_options])
  @nsqd = create_nsqds(opts[:nsqd_count], opts[:nsqd_options])
  @nsqadmin = create_nsqadmin if opts[:nsqadmin]

  begin
    # start everything!
    all_services.each{|d| d.start(async: true)}

    # by default, block execution until everything is started
    block_until_running unless opts[:async]
  rescue Exception => ex
    # if we hit an error, stop everything that we started
    destroy
    raise ex
  end
end

Public Instance Methods

block_until_running(timeout = 10) click to toggle source
# File lib/nsq-cluster.rb, line 97
def block_until_running(timeout = 10)
  puts "Waiting for cluster to launch..." if @verbose
  begin
    Timeout::timeout(timeout) do
      all_services.each {|service| service.block_until_running}
      puts "Cluster launched." if @verbose
    end
  rescue Timeout::Error
    raise "Cluster did not fully launch within #{timeout} seconds."
  end
end
block_until_stopped(timeout = 10) click to toggle source
# File lib/nsq-cluster.rb, line 110
def block_until_stopped(timeout = 10)
  puts "Waiting for cluster to stop..." if @verbose
  begin
    Timeout::timeout(timeout) do
      all_services.each{|service| service.block_until_stopped}
      puts "Cluster stopped." if @verbose
    end
  rescue Timeout::Error
    raise "Cluster did not fully stop within #{timeout} seconds."
  end
end
create_nsqadmin() click to toggle source
# File lib/nsq-cluster.rb, line 78
def create_nsqadmin
  Nsqadmin.new(
    { nsqlookupd: @nsqlookupd },
    @verbose
  )
end
create_nsqds(count, options) click to toggle source
# File lib/nsq-cluster.rb, line 65
def create_nsqds(count, options)
  (0...count).map do |idx|
    Nsqd.new(
      options.merge({
        id: idx,
        nsqlookupd: @nsqlookupd
      }),
      @verbose
    )
  end
end
create_nsqlookupds(count, options) click to toggle source
# File lib/nsq-cluster.rb, line 53
def create_nsqlookupds(count, options)
  (0...count).map do |idx|
    Nsqlookupd.new(
      options.merge({
        id: idx
      }),
      @verbose
    )
  end
end
destroy() click to toggle source
# File lib/nsq-cluster.rb, line 86
def destroy
  all_services.each{|s| s.destroy}
end
nsqlookupd_http_endpoints() click to toggle source

return an array of http endpoints

# File lib/nsq-cluster.rb, line 92
def nsqlookupd_http_endpoints
  @nsqlookupd.map { |lookupd| "http://#{lookupd.host}:#{lookupd.http_port}" }
end

Private Instance Methods

all_services() click to toggle source
# File lib/nsq-cluster.rb, line 124
def all_services
  # nsqadmin responds to /ping as well, even though it is not documented.
  (@nsqlookupd + @nsqd + [@nsqadmin]).compact
end