class Nsqd

Attributes

base_port[R]
host[R]
http_port[R]
id[R]
tcp_port[R]

Public Class Methods

new(opts = {}, verbose = false) click to toggle source
Calls superclass method ProcessWrapper::new
# File lib/nsq-cluster/nsqd.rb, line 19
def initialize(opts = {}, verbose = false)
  super

  @id = opts.delete(:id) || 0
  @host = opts.delete(:host) || '127.0.0.1'

  # Use a non-standard nsqd port by default so as to not conflict with any
  # local instances. This is helpful when running tests!
  @base_port = opts.delete(:base_port) || 4250

  @tcp_port = opts.delete(:tcp_port) || (@base_port + @id * 2)
  @http_port = opts.delete(:http_port) || (@base_port + 1 + @id * 2)
  @lookupd = opts.delete(:nsqlookupd) || []
  @broadcast_address = opts.delete(:broadcast_address) || @host

  @extra_args = opts.map do |key, value|
    "--#{key.to_s.gsub('_', '-')}=#{value}"
  end

  clear_data_directory
  create_data_directory
end
version_is_pre_1?() click to toggle source

Returns true if nsqd's version is < 1.0

# File lib/nsq-cluster/nsqd.rb, line 12
def self.version_is_pre_1?
  @version_is_pre_1 ||= (
    `nsqd -version`.index('nsqd v0') == 0
  )
end

Public Instance Methods

args() click to toggle source
# File lib/nsq-cluster/nsqd.rb, line 54
def args
  base_args = [
    %Q(--tcp-address=#{@host}:#{@tcp_port}),
    %Q(--http-address=#{@host}:#{@http_port}),
    %Q(--data-path=#{data_path}),
    %Q(--broadcast-address=#{@broadcast_address})
  ]

  if Nsqd.version_is_pre_1?
    node_args = [%Q(--worker-id=#{id})]
  else
    node_args = [%Q(--node-id=#{id})]
  end

  lookupd_args = @lookupd.map do |ld|
    %Q(--lookupd-tcp-address=#{ld.host}:#{ld.tcp_port})
  end

  base_args + node_args + @extra_args + lookupd_args
end
command() click to toggle source
# File lib/nsq-cluster/nsqd.rb, line 49
def command
  'nsqd'
end
create(params = {}) click to toggle source

create a topic or a channel in an existing topic

# File lib/nsq-cluster/nsqd.rb, line 95
def create(params = {})
  nsqd_post 'create', topic: params[:topic], channel: params[:channel]
end
data_path() click to toggle source

find or create a temporary data directory for this instance

# File lib/nsq-cluster/nsqd.rb, line 77
def data_path
  "/tmp/nsqd-#{id}"
end
delete(params = {}) click to toggle source

delete a topic or a channel in an existing topic

# File lib/nsq-cluster/nsqd.rb, line 101
def delete(params = {})
  nsqd_post 'delete', topic: params[:topic], channel: params[:channel]
end
destroy() click to toggle source
Calls superclass method ProcessWrapper#destroy
# File lib/nsq-cluster/nsqd.rb, line 43
def destroy
  super
  clear_data_directory
end
empty(params = {}) click to toggle source

empty a topic or a channel in an existing topic

# File lib/nsq-cluster/nsqd.rb, line 107
def empty(params = {})
  nsqd_post 'empty', topic: params[:topic], channel: params[:channel]
end
info() click to toggle source

returns version number

# File lib/nsq-cluster/nsqd.rb, line 137
def info
  get 'info'
end
mpub(topic, *messages) click to toggle source

publish multiple messages to a topic

# File lib/nsq-cluster/nsqd.rb, line 89
def mpub(topic, *messages)
  post 'mpub', { topic: topic }, messages.join("\n")
end
pause(params = {}) click to toggle source

pause a topic or a channel in a topic

# File lib/nsq-cluster/nsqd.rb, line 113
def pause(params = {})
  nsqd_post 'pause', topic: params[:topic], channel: params[:channel]
end
ping() click to toggle source

monitoring endpoint

# File lib/nsq-cluster/nsqd.rb, line 131
def ping
  get 'ping'
end
pub(topic, message) click to toggle source

publish a single message to a topic

# File lib/nsq-cluster/nsqd.rb, line 83
def pub(topic, message)
  post 'pub', { topic: topic }, message
end
stats() click to toggle source

return stats in json format

# File lib/nsq-cluster/nsqd.rb, line 125
def stats
  get 'stats', format: 'json'
end
unpause(params = {}) click to toggle source

unpause a topic or a channel in a topic

# File lib/nsq-cluster/nsqd.rb, line 119
def unpause(params = {})
  nsqd_post 'unpause', topic: params[:topic], channel: params[:channel]
end

Private Instance Methods

clear_data_directory() click to toggle source
# File lib/nsq-cluster/nsqd.rb, line 150
def clear_data_directory
  FileUtils.rm_rf(data_path) if Dir.exist?(data_path)
end
create_data_directory() click to toggle source
# File lib/nsq-cluster/nsqd.rb, line 145
def create_data_directory
  Dir.mkdir(data_path)
end
nsqd_post(action, params) click to toggle source
# File lib/nsq-cluster/nsqd.rb, line 155
def nsqd_post(action, params)
  if params[:topic] && params[:channel]
    post "channel/#{action}", topic: params[:topic], channel: params[:channel]
  elsif params[:topic]
    post "topic/#{action}", topic: params[:topic]
  else
    raise 'you must specify a topic or topic and channel'
  end
end