class Fluent::Counter::Client
Constants
- DEFAULT_ADDR
- DEFAULT_PORT
- DEFAULT_TIMEOUT
- ID_LIMIT_COUNT
Public Class Methods
new(loop = nil, opt = {})
click to toggle source
# File lib/fluent/counter/client.rb, line 30 def initialize(loop = nil, opt = {}) @loop = loop || Coolio::Loop.new @port = opt[:port] || DEFAULT_PORT @host = opt[:host] || DEFAULT_ADDR @log = opt[:log] || $log @timeout = opt[:timeout] || DEFAULT_TIMEOUT @conn = Connection.connect(@host, @port, method(:on_message)) @responses = {} @id = 0 @id_mutex = Mutex.new @loop_mutex = Mutex.new end
Public Instance Methods
delete(*params, options: {}) { |get| ... }
click to toggle source
# File lib/fluent/counter/client.rb, line 95 def delete(*params, options: {}) exist_scope! res = send_request('delete', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
establish(scope)
click to toggle source
# File lib/fluent/counter/client.rb, line 60 def establish(scope) scope = Timeout.timeout(@timeout) { response = send_request('establish', nil, [scope]) Fluent::Counter.raise_error(response.errors.first) if response.errors? data = response.data data.first } @scope = scope rescue Timeout::Error raise "Can't establish the connection to counter server due to timeout" end
get(*params, options: {}) { |get| ... }
click to toggle source
# File lib/fluent/counter/client.rb, line 129 def get(*params, options: {}) exist_scope! res = send_request('get', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
inc(params, options: {}) { |get| ... }
click to toggle source
Example¶ ↑
‘inc` receives various arguments.
-
inc(name: ‘name’)
-
inc({ name: ‘name’,value: 20 }, options: {})
-
inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }])
-
inc([{ name: ‘name1’,value: 20 }, { name: ‘name2’,value: 20 }], options: {})
# File lib/fluent/counter/client.rb, line 115 def inc(params, options: {}) exist_scope! params = [params] unless params.is_a?(Array) res = send_request('inc', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
init(params, options: {}) { |get| ... }
click to toggle source
Example¶ ↑
‘init` receives various arguments.
-
init(name: ‘name’)
-
init({ name: ‘name’,reset_interval: 20 }, options: {})
-
init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }])
-
init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }], options: {})
-
init([{ name: ‘name1’,reset_interval: 20 }, { name: ‘name2’,reset_interval: 20 }]) { |res| … }
# File lib/fluent/counter/client.rb, line 80 def init(params, options: {}) exist_scope! params = [params] unless params.is_a?(Array) res = send_request('init', @scope, params, options) # if `async` is false or missing, block at this method and return a Future::Result object. if block_given? Thread.start do yield res.get end else res end end
reset(*params, options: {}) { |get| ... }
click to toggle source
# File lib/fluent/counter/client.rb, line 142 def reset(*params, options: {}) exist_scope! res = send_request('reset', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
start()
click to toggle source
# File lib/fluent/counter/client.rb, line 43 def start @loop.attach(@conn) @log.debug("starting counter client: #{@host}:#{@port}") self rescue => e if @log @log.error e else STDERR.puts e end end
stop()
click to toggle source
# File lib/fluent/counter/client.rb, line 55 def stop @conn.close @log.debug("calling stop in counter client: #{@host}:#{@port}") end
Private Instance Methods
build_request(method, id, scope = nil, params = nil, options = nil)
click to toggle source
# File lib/fluent/counter/client.rb, line 179 def build_request(method, id, scope = nil, params = nil, options = nil) r = { id: id, method: method } r[:scope] = scope if scope r[:params] = params if params r[:options] = options if options r end
exist_scope!()
click to toggle source
# File lib/fluent/counter/client.rb, line 157 def exist_scope! raise 'Call `establish` method to get a `scope` before calling this method' unless @scope end
generate_id()
click to toggle source
# File lib/fluent/counter/client.rb, line 187 def generate_id id = 0 @id_mutex.synchronize do id = @id @id += 1 @id = 0 if ID_LIMIT_COUNT < @id end id end
on_message(data)
click to toggle source
# File lib/fluent/counter/client.rb, line 161 def on_message(data) if response = @responses.delete(data['id']) response.set(data) else @log.warn("Receiving missing id data: #{data}") end end
send_request(method, scope, params, opt = {})
click to toggle source
# File lib/fluent/counter/client.rb, line 169 def send_request(method, scope, params, opt = {}) id = generate_id res = Future.new(@loop, @loop_mutex) @responses[id] = res # set a response value to this future object at `on_message` request = build_request(method, id, scope, params, opt) @log.debug(request) @conn.send_data request res end