class Fluent::Counter::Client

Constants

DEFAULT_ADDR
DEFAULT_PORT
DEFAULT_TIMEOUT
ID_LIMIT_COUNT

Public Class Methods

new(loop = nil, opt = {}) click to toggle source
# File lib/fluent/counter/client.rb, line 30
def initialize(loop = nil, opt = {})
  @loop = loop || Coolio::Loop.new
  @port = opt[:port] || DEFAULT_PORT
  @host = opt[:host] || DEFAULT_ADDR
  @log = opt[:log] || $log
  @timeout = opt[:timeout] || DEFAULT_TIMEOUT
  @conn = Connection.connect(@host, @port, method(:on_message))
  @responses = {}
  @id = 0
  @id_mutex = Mutex.new
  @loop_mutex = Mutex.new
end

Public Instance Methods

delete(*params, options: {}) { |get| ... } click to toggle source
# File lib/fluent/counter/client.rb, line 95
def delete(*params, options: {})
  exist_scope!
  res = send_request('delete', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end
establish(scope) click to toggle source
# File lib/fluent/counter/client.rb, line 60
def establish(scope)
  scope = Timeout.timeout(@timeout) {
    response = send_request('establish', nil, [scope])
    Fluent::Counter.raise_error(response.errors.first) if response.errors?
    data = response.data
    data.first
  }
  @scope = scope
rescue Timeout::Error
  raise "Can't establish the connection to counter server due to timeout"
end
get(*params, options: {}) { |get| ... } click to toggle source
# File lib/fluent/counter/client.rb, line 129
def get(*params, options: {})
  exist_scope!
  res = send_request('get', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end
inc(params, options: {}) { |get| ... } click to toggle source

Example

‘inc` receives various arguments.

  1. inc(name: ‘name’)

  2. inc({ name: ‘name’,value: 20 }, options: {})

  3. inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }])

  4. inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }], options: {})

# File lib/fluent/counter/client.rb, line 115
def inc(params, options: {})
  exist_scope!
  params = [params] unless params.is_a?(Array)
  res = send_request('inc', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end
init(params, options: {}) { |get| ... } click to toggle source

Example

‘init` receives various arguments.

  1. init(name: ‘name’)

  2. init({ name: ‘name’,reset_interval: 20 }, options: {})

  3. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }])

  4. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }], options: {})

  5. init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }]) { |res| … }

# File lib/fluent/counter/client.rb, line 80
def init(params, options: {})
  exist_scope!
  params = [params] unless params.is_a?(Array)
  res = send_request('init', @scope, params, options)

  # if `async` is false or missing, block at this method and return a Future::Result object.
  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end
reset(*params, options: {}) { |get| ... } click to toggle source
# File lib/fluent/counter/client.rb, line 142
def reset(*params, options: {})
  exist_scope!
  res = send_request('reset', @scope, params, options)

  if block_given?
    Thread.start do
      yield res.get
    end
  else
    res
  end
end
start() click to toggle source
# File lib/fluent/counter/client.rb, line 43
def start
  @loop.attach(@conn)
  @log.debug("starting counter client: #{@host}:#{@port}")
  self
rescue => e
  if @log
    @log.error e
  else
    STDERR.puts e
  end
end
stop() click to toggle source
# File lib/fluent/counter/client.rb, line 55
def stop
  @conn.close
  @log.debug("calling stop in counter client: #{@host}:#{@port}")
end

Private Instance Methods

build_request(method, id, scope = nil, params = nil, options = nil) click to toggle source
# File lib/fluent/counter/client.rb, line 179
def build_request(method, id, scope = nil, params = nil, options = nil)
  r = { id: id, method: method }
  r[:scope] = scope if scope
  r[:params] = params if params
  r[:options] = options if options
  r
end
exist_scope!() click to toggle source
# File lib/fluent/counter/client.rb, line 157
def exist_scope!
  raise 'Call `establish` method to get a `scope` before calling this method' unless @scope
end
generate_id() click to toggle source
# File lib/fluent/counter/client.rb, line 187
def generate_id
  id = 0
  @id_mutex.synchronize do
    id = @id
    @id += 1
    @id = 0 if ID_LIMIT_COUNT < @id
  end
  id
end
on_message(data) click to toggle source
# File lib/fluent/counter/client.rb, line 161
def on_message(data)
  if response = @responses.delete(data['id'])
    response.set(data)
  else
    @log.warn("Receiving missing id data: #{data}")
  end
end
send_request(method, scope, params, opt = {}) click to toggle source
# File lib/fluent/counter/client.rb, line 169
def send_request(method, scope, params, opt = {})
  id = generate_id
  res = Future.new(@loop, @loop_mutex)
  @responses[id] = res # set a response value to this future object at `on_message`
  request = build_request(method, id, scope, params, opt)
  @log.debug(request)
  @conn.send_data request
  res
end