Skip to content

Commit

Permalink
Delay initialization and subscription until ES is booted.
Browse files Browse the repository at this point in the history
  • Loading branch information
TreyE committed Sep 10, 2024
1 parent 866d913 commit 7271182
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
2 changes: 2 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def initialize!(force = false)
create_connections
load_async_api_resources
load_components
EventSource::Subscriber.initialize_subscribers
EventSource::Publisher.initialize_publishers
end

def config
Expand Down
24 changes: 18 additions & 6 deletions lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def self.publisher_container
@publisher_container ||= Concurrent::Map.new
end

def self.initialization_registry
@initialization_registry ||= Concurrent::Array.new
end

def self.initialize_publishers
self.initialization_registry.each do |pub|
pub.validate
end
end

def self.[](exchange_ref)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
Expand All @@ -46,12 +56,14 @@ def included(base)
}
base.extend(ClassMethods)

TracePoint.trace(:end) do |t|
if base == t.self
base.validate
t.disable
end
end

EventSource::Publisher.initialization_registry << base
# TracePoint.trace(:end) do |t|
# if base == t.self
# base.validate
# t.disable
# end
# end
end

# methods to register events
Expand Down
25 changes: 19 additions & 6 deletions lib/event_source/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def self.executable_container
@executable_container ||= Concurrent::Map.new
end

def self.initialization_registry
@initialization_registry ||= Concurrent::Array.new
end

def self.initialize_subscribers
self.initialization_registry.each do |sub|
sub.create_subscription
end
end

def self.[](exchange_ref)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
Expand All @@ -49,12 +59,15 @@ def included(base)
base.extend ClassMethods
base.include InstanceMethods

TracePoint.trace(:end) do |t|
if base == t.self
base.create_subscription
t.disable
end
end
# TODO: Wrap this back in a delayed tracepoint
EventSource::Subscriber.initialization_registry << base

# TracePoint.trace(:end) do |t|
# if base == t.self
# base.create_subscription
# t.disable
# end
# end
end

module InstanceMethods
Expand Down

0 comments on commit 7271182

Please sign in to comment.