class Acfs::Runner
@api private
Attributes
Public Class Methods
Source
# File lib/acfs/runner.rb, line 14 def initialize(adapter) @adapter = adapter @running = false end
Public Instance Methods
Source
# File lib/acfs/runner.rb, line 75 def clear queue.clear adapter.abort @running = false end
Source
# File lib/acfs/runner.rb, line 45 def enqueue(operation) tracer.in_span('acfs.runner.enqueue') do ::ActiveSupport::Notifications.instrument('acfs.runner.enqueue', operation: operation) do if running? operation_request(operation) {|req| adapter.queue req } else queue << operation end end end end
Enqueue operation to be run later.
Source
# File lib/acfs/runner.rb, line 22 def process(operation) ::ActiveSupport::Notifications.instrument('acfs.operation.before_process', operation: operation) operation.synchronous? ? run(operation) : enqueue(operation) end
Process an operation. Synchronous operations will be run and parallel operations will be queued.
Source
# File lib/acfs/runner.rb, line 39 def queue @queue ||= [] end
List of current queued operations.
Source
# File lib/acfs/runner.rb, line 29 def run(operation) tracer.in_span('acfs.runner.sync_run') do ::ActiveSupport::Notifications.instrument('acfs.runner.sync_run', operation: operation) do operation_request(operation) {|req| adapter.run req } end end end
Run operation right now skipping queue.
Source
# File lib/acfs/runner.rb, line 59 def running? @running end
Return true if queued operations are currently processed.
Source
# File lib/acfs/runner.rb, line 65 def start return if running? enqueue_operations start_all rescue StandardError queue.clear raise end
Start processing queued operations.
Private Instance Methods
Source
# File lib/acfs/runner.rb, line 90 def enqueue_operations while (operation = queue.shift) operation_request(operation) {|req| adapter.queue req } end end
Source
# File lib/acfs/runner.rb, line 96 def operation_request(operation) return if Acfs::Stub.enabled? && Acfs::Stub.stubbed(operation) req = operation.service.prepare(operation.request) return unless req.is_a? Acfs::Request req = prepare(req) return unless req.is_a? Acfs::Request yield req end
Source
# File lib/acfs/runner.rb, line 108 def prepare(request) method = request.method.to_s.upcase template = request.operation.location&.raw_uri.to_s name = "HTTP #{method}" name = "#{method} #{template}" if template attributes = { 'http.request.method' => method, 'server.address' => request.uri.host, 'server.port' => request.uri.port, 'url.full' => request.uri.to_s, 'url.scheme' => request.uri.scheme, 'url.template' => template, } span = tracer.start_span(name, attributes:, kind: :client) OpenTelemetry::Trace.with_span(span) do OpenTelemetry.propagation.inject(request.headers) request.on_complete do |response, nxt| span.set_attribute('http.response.status_code', response.status_code) span.status = OpenTelemetry::Trace::Status.error unless (100..399).cover?(response.status_code) span.finish nxt.call(response) end super end end
Calls superclass method
Acfs::Service::Middleware#prepare
Source
# File lib/acfs/runner.rb, line 83 def start_all @running = true adapter.start ensure @running = false end