module MockRedis::StreamMethods
Public Instance Methods
Source
# File lib/mock_redis/stream_methods.rb, line 32 def xadd(key, entry, opts = {}) id = opts[:id] || '*' with_stream_at(key) do |stream| stream.add id, entry stream.trim opts[:maxlen] if opts[:maxlen] return stream.last_id rescue Redis::CommandError => e raise Error.command_error(e.message, self) end end
Source
# File lib/mock_redis/stream_methods.rb, line 49 def xlen(key) with_stream_at(key) do |stream| return stream.count end end
Source
# File lib/mock_redis/stream_methods.rb, line 55 def xrange(key, first = '-', last = '+', count: nil) args = [first, last, false] args += ['COUNT', count] if count with_stream_at(key) do |stream| return stream.range(*args) rescue Redis::CommandError => e raise Error.command_error(e.message, self) end end
Source
# File lib/mock_redis/stream_methods.rb, line 75 def xread(keys, ids, count: nil, block: nil) args = [] args += ['COUNT', count] if count args += ['BLOCK', block.to_i] if block result = {} keys = keys.is_a?(Array) ? keys : [keys] ids = ids.is_a?(Array) ? ids : [ids] keys.each_with_index do |key, index| with_stream_at(key) do |stream| data = stream.read(ids[index], *args) result[key] = data unless data.empty? end end result end
Source
# File lib/mock_redis/stream_methods.rb, line 65 def xrevrange(key, last = '+', first = '-', count: nil) args = [first, last, true] args += ['COUNT', count] if count with_stream_at(key) do |stream| return stream.range(*args) rescue Redis::CommandError => e raise Error.command_error(e.message, self) end end
Source
# File lib/mock_redis/stream_methods.rb, line 43 def xtrim(key, count) with_stream_at(key) do |stream| stream.trim count end end
Private Instance Methods
Source
# File lib/mock_redis/stream_methods.rb, line 101 def assert_streamy(key) unless streamy?(key) raise Error.wrong_type_error(self) end end
Source
# File lib/mock_redis/stream_methods.rb, line 97 def streamy?(key) data[key].nil? || data[key].is_a?(Stream) end
Source
# File lib/mock_redis/stream_methods.rb, line 93 def with_stream_at(key, &blk) with_thing_at(key, :assert_streamy, proc { Stream.new }, &blk) end