class RubyEventStore::Client
Constants
- EMPTY_HASH
Attributes
Public Class Methods
Source
# File lib/ruby_event_store/client.rb, line 7 def initialize( repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) @repository = repository @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper) @broker = message_broker || Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || Dispatcher.new) @clock = clock @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator @event_type_resolver = event_type_resolver if (subscriptions || dispatcher) warn <<~EOW Passing subscriptions and dispatcher to #{self.class} has been deprecated. Pass it using message_broker argument. For example: event_store = RubyEventStore::Client.new( message_broker: RubyEventStore::Broker.new( subscriptions: RubyEventStore::Subscriptions.new, dispatcher: RubyEventStore::Dispatcher.new ) ) EOW warn <<~EOW if (message_broker) Because message_broker has been provided, arguments passed by subscriptions or dispatcher will be ignored. EOW end end
Public Instance Methods
Source
# File lib/ruby_event_store/client.rb, line 84 def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform(enrich_events_metadata(events)), stream_name: stream_name, expected_version: expected_version, ) self end
Persists new event(s) without notifying any subscribed handlers
@param (see publish
) @return [self]
Source
# File lib/ruby_event_store/client.rb, line 111 def delete_stream(stream_name) repository.delete_stream(Stream.new(stream_name)) self end
Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.
@param stream_name [String] name of the stream to be cleared. @return [self]
Source
# File lib/ruby_event_store/client.rb, line 310 def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } mapper.records_to_events( [ SerializedRecord.new( event_type: event_type, event_id: event_id, data: data, metadata: metadata, timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)], valid_at: valid_at || timestamp_, ).deserialize(serializer), ], ).first end
Deserialize event which was serialized for async event handlers {railseventstore.org/docs/subscribe/#async-handlers Read more}
@return [Event] deserialized event
Source
# File lib/ruby_event_store/client.rb, line 163 def event_in_stream?(event_id, stream_name) stream = Stream.new(stream_name) stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream) end
Checks whether event is linked in given stream
@param event_id [String] @param stream_name [String] @return [Boolean] true if event is linked to given stream, false otherwise
Source
# File lib/ruby_event_store/client.rb, line 154 def global_position(event_id) repository.global_position(event_id) end
Gets position of the event in global stream
The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.
@param event_id [String] @raise [EventNotFound] @return [Integer] nonnegno ative integer position of event in global stream
Source
# File lib/ruby_event_store/client.rb, line 369 def inspect "#<#{self.class}:0x#{__id__.to_s(16)}>" end
Source
# File lib/ruby_event_store/client.rb, line 100 def link(event_ids, stream_name:, expected_version: :any) repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version)) self end
Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.
@param event_ids [String, Array<String>] ids of events @param stream_name (see publish
) @param expected_version (see publish
) @return [self]
Source
# File lib/ruby_event_store/client.rb, line 331 def metadata @metadata.value || EMPTY_HASH end
Read additional metadata which will be added for published events {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
@return [Hash]
Source
# File lib/ruby_event_store/client.rb, line 364 def overwrite(events_or_event) repository.update_messages(transform(Array(events_or_event))) self end
Overwrite existing event(s) with the same ID.
Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. {railseventstore.org/docs/migrating_messages Read more}
@example Add data and metadata to existing events
events = event_store.read.limit(10).to_a events.each do |ev| ev.data[:tenant_id] = 1 ev.metadata[:server_id] = "eu-west-2" end event_store.overwrite(events)
@example Change event type
events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev| NewType.new( event_id: ev.event_id, data: ev.data, metadata: ev.metadata, ) end event_store.overwrite(events)
@param events [Array<Event>, Event] event(s) to serialize and overwrite again @return [self]
Source
# File lib/ruby_event_store/client.rb, line 141 def position_in_stream(event_id, stream_name) repository.position_in_stream(event_id, Stream.new(stream_name)) end
Gets position of the event in given stream
The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.
@param event_id [String] @param stream_name [String] @return [Integer] nonnegative integer position of event in stream @raise [EventNotInStream]
Source
# File lib/ruby_event_store/client.rb, line 59 def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = enrich_events_metadata(events) records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do if broker.public_method(:call).arity == 3 broker.call(topic || event.event_type, event, record) else warn <<~EOW Message broker shall support topics. Topic WILL BE IGNORED in the current broker. Modify the broker implementation to pass topic as an argument to broker.call method. EOW broker.call(event, record) end end end self end
Persists events and notifies subscribed handlers about them
@param events [Array<Event>, Event] event(s) @param stream_name [String] name of the stream for persisting events. @param expected_version [:any, :auto, :none, Integer] controls optimistic locking strategy. {railseventstore.org/docs/expected_version/ Read more} @return [self]
Source
# File lib/ruby_event_store/client.rb, line 120 def read Specification.new(SpecificationReader.new(repository, mapper)) end
Starts building a query specification for reading events. {railseventstore.org/docs/read/ More info.}
@return [Specification]
Source
# File lib/ruby_event_store/client.rb, line 48 def rescue_from_double_json_serialization! return unless repository.respond_to? :rescue_from_double_json_serialization! repository.rescue_from_double_json_serialization! end
Source
# File lib/ruby_event_store/client.rb, line 127 def streams_of(event_id) repository.streams_of(event_id) end
Gets list of streams where event is stored or linked
@return [Array<Stream>] where event is stored or linked
Source
# File lib/ruby_event_store/client.rb, line 180 def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) }) end
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
@overload subscribe(subscriber, to:)
@param to [Array<Class>] types of events to subscribe @param subscriber [Object, Class] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
@overload subscribe(to:, &subscriber)
@param to [Array<Class>] types of events to subscribe @param subscriber [Proc] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
Source
# File lib/ruby_event_store/client.rb, line 196 def subscribe_to_all_events(subscriber = nil, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc broker.add_global_subscription(subscriber || proc) end
Subscribes a handler (subscriber) that will be invoked for all published events
@overload subscribe_to_all_events
(subscriber)
@param subscriber [Object, Class] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
@overload subscribe_to_all_events
(&subscriber)
@param subscriber [Proc] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
Source
# File lib/ruby_event_store/client.rb, line 205 def subscribers_for(event_class) broker.all_subscriptions_for(event_type_resolver.call(event_class)) end
Get list of handlers subscribed to an event
@param to [Class, String] type of events to get list of sybscribed handlers @return [Array<Object, Class>]
Source
# File lib/ruby_event_store/client.rb, line 298 def with_metadata(metadata_for_block, &block) previous_metadata = metadata self.metadata = previous_metadata.merge(metadata_for_block) block.call if block_given? ensure self.metadata = previous_metadata end
Set additional metadata for all events published within the provided block {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
@param metadata [Hash] metadata to set for events @param block [Proc] block of code during which the metadata will be added @return [Object] last value returned by the provided block
Source
# File lib/ruby_event_store/client.rb, line 287 def within(&block) raise ArgumentError if block.nil? Within.new(block, broker, event_type_resolver) end
Use for starting temporary subscriptions. {railseventstore.org/docs/subscribe/#temporary-subscriptions Read more}
@param block [Proc] block of code during which the temporary subscriptions will be active @return [Within] builder object which collects temporary subscriptions
Protected Instance Methods
Source
# File lib/ruby_event_store/client.rb, line 400 def batch_mapper?(mapper) %i[events_to_records records_to_events].all? { |m| mapper.respond_to? m } end
Source
# File lib/ruby_event_store/client.rb, line 408 def default_clock -> { Time.now.utc.round(TIMESTAMP_PRECISION) } end
Source
# File lib/ruby_event_store/client.rb, line 412 def default_correlation_id_generator -> { SecureRandom.uuid } end
Source
# File lib/ruby_event_store/client.rb, line 404 def metadata=(value) @metadata.value = value end
Private Instance Methods
Source
# File lib/ruby_event_store/client.rb, line 394 def append_records_to_stream(records, stream_name:, expected_version:) repository.append_to_stream(records, Stream.new(stream_name), ExpectedVersion.new(expected_version)) end
Source
# File lib/ruby_event_store/client.rb, line 387 def enrich_event_metadata(event) metadata.each { |key, value| event.metadata[key] ||= value } event.metadata[:timestamp] ||= clock.call event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp) event.metadata[:correlation_id] ||= correlation_id_generator.call end
Source
# File lib/ruby_event_store/client.rb, line 382 def enrich_events_metadata(events) events = Array(events) events.each { |event| enrich_event_metadata(event) } end
Source
# File lib/ruby_event_store/client.rb, line 378 def transform(events) mapper.events_to_records(events) end