class Riaq::Queue
Attributes
bucket[R]
Public Class Methods
new(name)
click to toggle source
# File lib/riaq.rb, line 10 def initialize(name) @bucket = riak.bucket("riaq:#{name}") end
Public Instance Methods
each(&block)
click to toggle source
# File lib/riaq.rb, line 22 def each(&block) @stopping = false loop do break if @stopping key = bucket.get_index("status_int", PENDING, max_results: 1).first next unless key item = @bucket.get(key) item.indexes = { status_int: PROCESSING } item.store block.call(item.data) item.delete end end
empty?()
click to toggle source
# File lib/riaq.rb, line 60 def empty? size == 0 end
flush()
click to toggle source
# File lib/riaq.rb, line 42 def flush bucket.get_index("$bucket", @bucket.name).each do |key| @bucket.delete(key) end end
items()
click to toggle source
# File lib/riaq.rb, line 52 def items Riak::MapReduce.new(riak).index(@bucket.name, "status_int", PENDING).map("Riak.mapValues", keep: true).run end
push(value)
click to toggle source
# File lib/riaq.rb, line 14 def push(value) item = @bucket.new(Time.now.to_f) item.content_type = "text/plain" item.data = value item.indexes = { status_int: PENDING } item.store end
Also aliased as: <<
riak()
click to toggle source
# File lib/riaq.rb, line 66 def riak @riak ||= Riak::Client.new(Riaq.options) end
size()
click to toggle source
# File lib/riaq.rb, line 56 def size bucket.get_index("status_int", PENDING).size end
stop()
click to toggle source
# File lib/riaq.rb, line 48 def stop @stopping = true end