class GHTDataRetrieval

Public Instance Methods

ghtorrent() click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 50
def ghtorrent
  #@gh ||= GHTorrent::Mirror.new(@settings)
  @gh ||= TransactedGHTorrent.new(settings)
  @gh
end
go() click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 64
def go

  unless ARGV[0].nil?
    event = retrieve_event(ARGV[0])

    if event.nil?
      warn "No event with id: #{ARGV[0]}"
    else
      send(event['type'], event)
    end
    return
  end

  conn = Bunny.new(:host => config(:amqp_host),
                   :port => config(:amqp_port),
                   :username => config(:amqp_username),
                   :password => config(:amqp_password))
  conn.start

  channel = conn.create_channel
  debug "Setting prefetch to #{config(:amqp_prefetch)}"
  channel.prefetch(config(:amqp_prefetch))
  debug "Connection to #{config(:amqp_host)} succeded"

  exchange = channel.topic(config(:amqp_exchange), :durable => true,
                           :auto_delete => false)

  handlers.each do |h|
    queue = channel.queue("#{h}s", {:durable => true})\
                       .bind(exchange, :routing_key => "evt.#{h}")

    info "Binding handler #{h} to routing key evt.#{h}"

    queue.subscribe(:ack => true) do |headers, properties, msg|
      start = Time.now
      begin
        data = retrieve_event(msg)
        send(h, data)

        channel.acknowledge(headers.delivery_tag, false)
        info "Success processing event. Type: #{data['type']}, ID: #{data['id']}, Time: #{Time.now.to_ms - start.to_ms} ms"
      rescue StandardError => e
        # Give a message a chance to be reprocessed
        if headers.redelivered?
          warn "Error processing event. Type: #{data['type']}, ID: #{data['id']}, Time: #{Time.now.to_ms - start.to_ms} ms"
          channel.reject(headers.delivery_tag, false)
        else
          channel.reject(headers.delivery_tag, true)
        end

        STDERR.puts e
        STDERR.puts e.backtrace.join("\n")
      end
    end
  end

  stopped = false
  while not stopped
    begin
      sleep(1)
    rescue Interrupt => _
      debug 'Exit requested'
      stopped = true
    end
  end

  debug 'Closing AMQP connection'
  channel.close unless channel.nil?
  conn.close unless conn.nil?

end
handlers() click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 26
def handlers
  %w(PushEvent WatchEvent FollowEvent MemberEvent CreateEvent
      CommitCommentEvent PullRequestEvent ForkEvent
      PullRequestReviewCommentEvent IssuesEvent IssueCommentEvent)
  #%w(ForkEvent)
end
logger() click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 46
def logger
  ghtorrent.logger
end
parse(msg) click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 22
def parse(msg)
  JSON.parse(msg)
end
persister() click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 17
def persister
  @persister ||= connect(:mongo, settings)
  @persister
end
prepare_options(options) click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 33
  def prepare_options(options)
    options.banner <<-BANNER
Retrieves events from queues and processes them through GHTorrent.
If event_id is provided, only this event is processed.
#{command_name} [event_id]
    BANNER

  end
retrieve_event(evt_id) click to toggle source
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 56
def retrieve_event(evt_id)
  event = persister.get_underlying_connection[:events].find_one('id' => evt_id)
  event.delete '_id'
  data = parse(event.to_json)
  debug "Processing event: #{data['type']}-#{data['id']}"
  data
end
validate() click to toggle source
Calls superclass method GHTorrent::Command#validate
# File lib/ghtorrent/commands/ght_data_retrieval.rb, line 42
def validate
  super
end