class Strongman::Batch
Attributes
fulfilled[RW]
fulfilling[RW]
lock[RW]
name[RW]
parent[RW]
Public Class Methods
new(loader_block, name: nil, parent: nil, max_batch_size: Float::INFINITY)
click to toggle source
# File lib/strongman.rb, line 17 def initialize(loader_block, name: nil, parent: nil, max_batch_size: Float::INFINITY) @name = name @queue = Concurrent::Array.new @promise = Concurrent::Promises.resolvable_future @loader_block = loader_block @lock = Concurrent::ReadWriteLock.new @parent = parent @children = Concurrent::Array.new @fulfilling = Concurrent::AtomicBoolean.new(false) @fulfilled = Concurrent::AtomicBoolean.new(false) @max_batch_size = max_batch_size @parent.children << self if @parent @root = nil @batch_chain = nil end
Public Instance Methods
batch_chain()
click to toggle source
# File lib/strongman.rb, line 123 def batch_chain if @batch_chain @batch_chain else @batch_chain = Concurrent::Array.new add_children = -> (batch) { @batch_chain << batch if batch.children.size > 0 batch.children.flat_map(&add_children) end } add_children.(root) @batch_chain end end
children()
click to toggle source
# File lib/strongman.rb, line 171 def children @children ||= Concurrent::Array.new end
fulfill!()
click to toggle source
# File lib/strongman.rb, line 155 def fulfill! results = @loader_block.call(@queue) if results.is_a?(Concurrent::Promises::Future) # if the strongman loader block returns a promise (e.g. if the block uses another loader), # make sure to touch it to kick off any delayed effects before chaining results.touch.then do |inner_results| @promise.fulfill(normalize_results(inner_results)) end.flat else @promise.fulfill(normalize_results(results)) end self end
fulfill_hierarchy()
click to toggle source
# File lib/strongman.rb, line 141 def fulfill_hierarchy raise Error.new("Only run #fulfill_hierarchy on root batches") if @parent with_lock do return if fulfilled? mark_fulfilling! batch_chain.reverse.each(&:fulfill!) ensure mark_fulfilled! mark_not_fulfilling! end end
fulfilled?()
click to toggle source
# File lib/strongman.rb, line 35 def fulfilled? root.fulfilled.true? end
fulfilling?()
click to toggle source
# File lib/strongman.rb, line 39 def fulfilling? root.fulfilling.true? end
mark_fulfilled!()
click to toggle source
# File lib/strongman.rb, line 86 def mark_fulfilled! root.fulfilled.make_true self end
mark_fulfilling!()
click to toggle source
# File lib/strongman.rb, line 91 def mark_fulfilling! root.fulfilling.make_true self end
mark_not_fulfilling!()
click to toggle source
# File lib/strongman.rb, line 96 def mark_not_fulfilling! root.fulfilling.make_false self end
needs_fulfilling?()
click to toggle source
# File lib/strongman.rb, line 43 def needs_fulfilling? !fulfilled? && !fulfilling? end
queue(key)
click to toggle source
# File lib/strongman.rb, line 47 def queue(key) @queue << key future = @promise.then do |results| unless results.key?(key) raise StandardError, "Batch loader didn't resolve a key: #{key}. Resolved keys: #{results.keys}" end result = results[key] if result.is_a?(Concurrent::Promises::Future) result else Concurrent::Promises.resolvable_future.fulfill(result) end end.flat # # If our queue is full, fulfill immediately and return the bare future # if @queue.size >= @max_batch_size root.fulfill_hierarchy future else # # If the queue is not full, create a delayed future that fulfills when the value is requested and chains # to the inner future # Concurrent::Promises.delay do # with_lock do root.fulfill_hierarchy if root.needs_fulfilling? # end future end.flat end end
root()
click to toggle source
# File lib/strongman.rb, line 107 def root if @root @root else find_top = -> (batch) { if batch.parent find_top.(batch.parent) else batch end } @root = find_top.(self) end end
with_lock() { || ... }
click to toggle source
# File lib/strongman.rb, line 101 def with_lock root.lock.with_write_lock do yield end end
Private Instance Methods
normalize_results(results)
click to toggle source
# File lib/strongman.rb, line 177 def normalize_results(results) unless results.is_a?(Array) || results.is_a?(Hash) raise TypeError, "Batch loader must return an Array or Hash, but returned: #{results.class.name}" end if @queue.size != results.size raise StandardError, "Batch loader must be instantiated with function that returns Array or Hash " \ "of the same size as provided to it Array of keys" \ "\n\nProvided keys:\n#{@queue}" \ "\n\nReturned values:\n#{results}" end if results.is_a?(Array) Hash[@queue.zip(results)] else results.is_a?(Hash) results end end