class GHTLoad

Public Instance Methods

go() click to toggle source
# File lib/ghtorrent/commands/ght_load.rb, line 57
def go
  # Num events read
  total_read = 0

  puts "Loading events after #{Time.at(options[:earliest])}" if options[:verbose]
  puts "Loading events before #{Time.at(options[:latest])}" if options[:verbose]
  puts "Loading #{options[:number]} items" if options[:verbose]

  what = case
           when options[:filter].is_a?(Array)
             options[:filter].reduce({}) { |acc,x|
               (k,r) = x.split(/=/)
               acc[k] = Regexp.new(r)
               acc
             }
           when filter == []
             {}
         end

  from = {'_id' => {
      '$gte' => BSON::ObjectId.from_time(Time.at(options[:earliest])),
      '$lte' => BSON::ObjectId.from_time(Time.at(options[:latest]))}
  }

  (puts 'Mongo filter:'; pp what.merge(from)) if options[:verbose]

  conn = Bunny.new(:host => config(:amqp_host),
                   :port => config(:amqp_port),
                   :username => config(:amqp_username),
                   :password => config(:amqp_password))
  conn.start

  channel = conn.create_channel
  puts "Connection to #{config(:amqp_host)} succeded"

  exchange = channel.topic(config(:amqp_exchange),
                           :durable => true, :auto_delete => false)

  stopped = false
  while not stopped
    begin
      persister.get_underlying_connection[:events].find(what.merge(from),
                                                        :snapshot => true).each do |e|
        unq = read_value(e, 'type')
        if unq.class != String or unq.nil? then
          raise Exception.new('Unique value can only be a String')
        end

        exchange.publish e['id'], :persistent => false,
                         :routing_key => "evt.#{e['type']}"

        total_read += 1
        puts "Publish id = #{e['id']} #{e['created_at']} (#{total_read} read)" if options.verbose

        if total_read >= options[:number]
          puts 'Finished reading, exiting'
          return 
        end
      end
      stopped = true
    rescue Interrupt
      puts 'Interrupted'
      stopped = true
    end
  end
end
persister() click to toggle source
# File lib/ghtorrent/commands/ght_load.rb, line 16
def persister
  @persister ||= connect(:mongo, settings)
  @persister
end
prepare_options(options) click to toggle source
# File lib/ghtorrent/commands/ght_load.rb, line 21
  def prepare_options(options)
    options.banner <<-BANNER
Loads object ids from a collection to a queue for further processing.

#{command_name} [options] collection

#{command_name} options:
    BANNER

    options.opt :earliest, 'Seconds since epoch of earliest item to load',
                :short => 'e', :default => 0, :type => :int
    options.opt :latest, 'Seconds since epoch of latest item to load',
                :short => 'x', :default => Time.now.to_i + (60 * 60 * 24 * 360 * 20),
                :type => :int
    options.opt :number, 'Total number of items to load',
                :short => 'n', :type => :int, :default => 2**48
    options.opt :filter,
                'Filter items by regexp on item attributes: item.attr=regexp',
                :short => 'f', :type => String, :multi => true
  end
validate() click to toggle source
Calls superclass method GHTorrent::Command#validate
# File lib/ghtorrent/commands/ght_load.rb, line 42
def validate
  super
  filter = options[:filter]
  case
    when filter.is_a?(Array)
      options[:filter].each { |x|
        Trollop::die "not a valid filter #{x}" unless is_filter_valid?(x)
      }
    when filter == []
      # Noop
    else
      Trollop::die 'A filter can only be a string'
  end
end

Private Instance Methods

is_filter_valid?(filter) click to toggle source
# File lib/ghtorrent/commands/ght_load.rb, line 126
def is_filter_valid?(filter)
  (k, r) = filter.split(/=/)
  return false if r.nil?
  begin
    Regexp.new(r)
    true
  rescue
    false
  end
end