class Riaq::Queue

Attributes

bucket[R]

Public Class Methods

new(name) click to toggle source
# File lib/riaq.rb, line 10
def initialize(name)
  @bucket = riak.bucket("riaq:#{name}")
end

Public Instance Methods

<<(value)
Alias for: push
each(&block) click to toggle source
# File lib/riaq.rb, line 22
def each(&block)
  @stopping = false

  loop do
    break if @stopping

    key = bucket.get_index("status_int", PENDING, max_results: 1).first

    next unless key

    item = @bucket.get(key)
    item.indexes = { status_int: PROCESSING }
    item.store

    block.call(item.data)

    item.delete
  end
end
empty?() click to toggle source
# File lib/riaq.rb, line 60
def empty?
  size == 0
end
flush() click to toggle source
# File lib/riaq.rb, line 42
def flush
  bucket.get_index("$bucket", @bucket.name).each do |key|
    @bucket.delete(key)
  end
end
items() click to toggle source
# File lib/riaq.rb, line 52
def items
  Riak::MapReduce.new(riak).index(@bucket.name, "status_int", PENDING).map("Riak.mapValues", keep: true).run
end
push(value) click to toggle source
# File lib/riaq.rb, line 14
def push(value)
  item = @bucket.new(Time.now.to_f)
  item.content_type = "text/plain"
  item.data = value
  item.indexes = { status_int: PENDING }
  item.store
end
Also aliased as: <<
riak() click to toggle source
# File lib/riaq.rb, line 66
def riak
  @riak ||= Riak::Client.new(Riaq.options)
end
size() click to toggle source
# File lib/riaq.rb, line 56
def size
  bucket.get_index("status_int", PENDING).size
end
stop() click to toggle source
# File lib/riaq.rb, line 48
def stop
  @stopping = true
end