module PubSubTie::Publisher

Public Instance Methods

batch(event_sym, messages, resource) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 39
def batch(event_sym, messages, resource)
  topic = @pubsub.
      topic(Events.full_name event_sym)
  messages.each do |data|
    message = augmented(data, event_sym)
    topic.publish_async(payload(validate_data(event_sym, message), resource),
                        publish_time: Time.now.utc) do |result|
      unless result.succeeded?
        Rails.logger.error(
            "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
      end
    end
  end
  topic.async_publisher.stop.wait!
end
configure(config) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 7
def configure(config)
  @pubsub = google_pubsub(config)
end
google_pubsub(config) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 11
def google_pubsub(config)
  keyfile = File.join(PubSubTie.app_root, 'config', config['keyfile'])
  creds = Google::Cloud::PubSub::Credentials.new keyfile

  Google::Cloud::PubSub.new(project_id: config['project_id'],
                            credentials: creds)
end
publish(event_sym, data, resource) click to toggle source

Publishes event data asynchronously to topic inferred from event_sym. Data is augmented with event_name and event_time and validated against loaded configuration

# File lib/pubsub_tie/publisher.rb, line 24
def publish(event_sym, data, resource)
  message = augmented(data, event_sym)      

  @pubsub.
    topic(Events.full_name event_sym).
    # publish(message(payload, resource), publish_time: Time.now.utc)
    publish_async(payload(validate_data(event_sym, message), resource),
                  publish_time: Time.now.utc) do |result|
      unless result.succeeded?
        Rails.logger.error(
          "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
      end
    end
end

Private Instance Methods

augmented(data, event_sym) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 76
def augmented(data, event_sym)
  {event_name: Events.name(event_sym), 
   event_time: Time.current.utc}.merge(data.to_hash.to_options)
end
bad_type(field, data) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 111
def bad_type(field, data)
  raise ArgumentError.new("Bad type for field #{field} in event #{data}")
end
missing_required(sym, data) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 72
def missing_required(sym, data)
  Events.required(sym) - data.keys
end
payload(data, resource) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 56
def payload(data, resource)
  # TODO: embed resource in message
  data.to_json
end
validate_data(sym, data) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 61
def validate_data(sym, data)
  missing = missing_required(sym, data)
  unless missing.empty?
    raise ArgumentError.new(
      "Missing event required args for #{sym}: #{missing}")
  end

  validate_types(sym, 
                 data.slice(*(Events.required(sym) + Events.optional(sym))))
end
validate_type(field, val, data, sym) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 89
def validate_type(field, val, data, sym)
  types = Events.types(sym)

  case val
  when String
    bad_type(field, data) unless types[field.to_s] == "STRING" 
  when Integer
    bad_type(field, data) unless ["INT", "FLOAT", "INT64", "SMALLINT", "INTEGER", "BIGINT", "TINYINT", "BYTEINT"].include? types[field.to_s]
  when Numeric
    bad_type(field, data) unless ["DECIMAL", "BIGDECIMAL", "FLOAT", "FLOAT64"].include? types[field.to_s]
  when Time
    bad_type(field, data) unless types[field.to_s] == "TIMESTAMP"
  when DateTime
    bad_type(field, data) unless types[field.to_s] == "DATETIME"
  when Array
    bad_type(field, data) unless Events.repeated(sym).include? field
    val.each {|elem| validate_type(field, elem, data, sym) }
  else
    bad_type(field, data)
  end
end
validate_types(sym, data) click to toggle source
# File lib/pubsub_tie/publisher.rb, line 81
def validate_types(sym, data)
  data.each do |field, val|
    validate_type(field, val, data, sym)
  end

  data
end