class PubControlClient

The PubControlClient class allows consumers to publish either synchronously or asynchronously to an endpoint of their choice. The consumer wraps a Format class instance in an Item class instance and passes that to the publish methods. The async publish method has an optional callback parameter that is called after the publishing is complete to notify the consumer of the result.

Attributes

req_queue[RW]

Public Class Methods

new(uri) click to toggle source

Initialize this class with a URL representing the publishing endpoint.

# File lib/pubcontrolclient.rb, line 26
def initialize(uri)
  @uri = uri
  @lock = Mutex.new
  @thread = nil
  @thread_cond = nil
  @thread_mutex = nil
  @req_queue = Containers::Deque.new
  @auth_basic_user = nil
  @auth_basic_pass = nil
  @auth_jwt_claim = nil
  @auth_jwt_key = nil
  @http = Net::HTTP::Persistent.new @object_id.to_s
  @http.open_timeout = 10
  @http.read_timeout = 10
end
timestamp_utcnow() click to toggle source

A helper method for returning the current UNIX UTC timestamp.

# File lib/pubcontrolclient.rb, line 124
def self.timestamp_utcnow
  return Time.now.utc.to_i
end

Public Instance Methods

close() click to toggle source

This method closes the PubControlClient instance by ensuring all pending data is sent and any open connections are closed.

# File lib/pubcontrolclient.rb, line 118
def close
  wait_all_sent
  @http.shutdown
end
finish() click to toggle source

DEPRECATED: The finish method is now deprecated in favor of the more descriptive wait_all_sent method.

# File lib/pubcontrolclient.rb, line 112
def finish
  wait_all_sent
end
publish(channels, item) click to toggle source

The synchronous publish method for publishing the specified item to the specified channels on the configured endpoint.

# File lib/pubcontrolclient.rb, line 62
def publish(channels, item)
  exports = [channels].flatten.map do |channel|
    export            = item.export
    export['channel'] = channel
    export
  end
  uri   = nil
  auth  = nil
  @lock.synchronize do
    uri   = @uri
    auth  = gen_auth_header
  end
  pubcall(uri, auth, exports)
end
publish_async(channels, item, callback=nil) click to toggle source

The asynchronous publish method for publishing the specified item to the specified channels on the configured endpoint. The callback method is optional and will be passed the publishing results after publishing is complete.

# File lib/pubcontrolclient.rb, line 81
def publish_async(channels, item, callback=nil)
  exports = [channels].flatten.map do |channel|
    export            = item.export
    export['channel'] = channel
    export
  end
  uri   = nil
  auth  = nil
  @lock.synchronize do
    uri   = @uri
    auth  = gen_auth_header
    ensure_thread
  end
  queue_req(['pub', uri, auth, exports, callback])
end
set_auth_basic(username, password) click to toggle source

Call this method and pass a username and password to use basic authentication with the configured endpoint.

# File lib/pubcontrolclient.rb, line 44
def set_auth_basic(username, password)
  @lock.synchronize do
    @auth_basic_user = username
    @auth_basic_pass = password
  end
end
set_auth_jwt(claim, key) click to toggle source

Call this method and pass a claim and key to use JWT authentication with the configured endpoint.

# File lib/pubcontrolclient.rb, line 53
def set_auth_jwt(claim, key)
  @lock.synchronize do
    @auth_jwt_claim = claim
    @auth_jwt_key = key
  end
end
wait_all_sent() click to toggle source

This method is a blocking method that ensures that all asynchronous publishing is complete prior to returning and allowing the consumer to proceed.

# File lib/pubcontrolclient.rb, line 100
def wait_all_sent
  @lock.synchronize do
    if !@thread.nil?
      queue_req(['stop'])
      @thread.join
      @thread = nil
    end
  end
end

Private Instance Methods

ensure_thread() click to toggle source

An internal method that ensures that asynchronous publish calls are properly processed. This method initializes the required class fields, starts the pubworker worker thread, and is meant to execute only when the consumer makes an asynchronous publish call.

# File lib/pubcontrolclient.rb, line 247
def ensure_thread
  if @thread.nil?
    @thread_cond = ConditionVariable.new
    @thread_mutex = Mutex.new
    @thread = Thread.new { pubworker }
  end
end
gen_auth_header() click to toggle source

An internal method used to generate an authorization header. The authorization header is generated based on whether basic or JWT authorization information was provided via the publicly accessible 'set_*_auth' methods defined above.

# File lib/pubcontrolclient.rb, line 226
def gen_auth_header
  if !@auth_basic_user.nil?
    return 'Basic ' + Base64.encode64(
        "#{@auth_basic_user}:#{@auth_basic_pass}")
  elsif !@auth_jwt_claim.nil?
    if !@auth_jwt_claim.key?('exp')
      claim = @auth_jwt_claim.clone
      claim['exp'] = PubControlClient.timestamp_utcnow + 3600
    else
      claim = @auth_jwt_claim
    end
    return 'Bearer ' + JWT.encode(claim, @auth_jwt_key)
  else
    return nil
  end
end
make_http_request(uri, request) click to toggle source

An internal method for making the specified HTTP request to the specified URI.

# File lib/pubcontrolclient.rb, line 152
def make_http_request(uri, request)
  response = @http.request uri, request
  return response
end
pubbatch(reqs) click to toggle source

An internal method for publishing a batch of requests. The requests are parsed for the URI, authorization header, and each request is published to the endpoint. After all publishing is complete, each callback corresponding to each request is called (if a callback was originally provided for that request) and passed a result indicating whether that request was successfully published.

# File lib/pubcontrolclient.rb, line 163
def pubbatch(reqs)
  raise 'reqs length == 0' unless reqs.length > 0
  uri = reqs[0][0]
  auth_header = reqs[0][1]
  items = Array.new
  callbacks = Array.new
  reqs.each do |req|
    if req[2].is_a? Array
      items = items + req[2]
    else
      items.push(req[2])
    end
    callbacks.push(req[3])
  end
  begin
    pubcall(uri, auth_header, items)
    result = [true, '']
  rescue => e
    result = [false, e.message]
  end
  callbacks.each do |callback|
    if !callback.nil?
      callback.call(result[0], result[1])
    end
  end
end
pubcall(uri, auth_header, items) click to toggle source

An internal method for preparing the HTTP POST request for publishing data to the endpoint. This method accepts the URI endpoint, authorization header, and a list of items to publish.

# File lib/pubcontrolclient.rb, line 133
def pubcall(uri, auth_header, items)
  uri = URI(uri + '/publish/')
  content = Hash.new
  content['items'] = items
  request = Net::HTTP::Post.new(uri.request_uri)
  request.body = content.to_json
  if !auth_header.nil?
    request['Authorization'] = auth_header
  end
  request['Content-Type'] = 'application/json'
  response = make_http_request(uri, request)
  if !response.kind_of? Net::HTTPSuccess
    raise 'failed to publish: ' + response.class.to_s + ' ' +
        response.message + ' ' + response.body.dump
  end
end
pubworker() click to toggle source

An internal method that is meant to run as a separate thread and process asynchronous publishing requests. The method runs continously and publishes requests in batches containing a maximum of 10 requests. The method completes and the thread is terminated only when a 'stop' command is provided in the request queue.

# File lib/pubcontrolclient.rb, line 195
def pubworker
  quit = false
  while !quit do
    @thread_mutex.lock
    if @req_queue.length == 0
      @thread_cond.wait(@thread_mutex)
      if @req_queue.length == 0
        @thread_mutex.unlock
        next
      end
    end
    reqs = Array.new
    while @req_queue.length > 0 and reqs.length < 10 do
      m = @req_queue.pop_front
      if m[0] == 'stop'
        quit = true
        break
      end
      reqs.push([m[1], m[2], m[3], m[4]])
    end
    @thread_mutex.unlock
    if reqs.length > 0
      pubbatch(reqs)
    end
  end
end
queue_req(req) click to toggle source

An internal method for adding an asynchronous publish request to the publishing queue. This method will also activate the pubworker worker thread to make sure that it process any and all requests added to the queue.

# File lib/pubcontrolclient.rb, line 259
def queue_req(req)
  @thread_mutex.lock
  @req_queue.push_back(req)
  @thread_cond.signal
  @thread_mutex.unlock
end