Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't start event source until Rails is ready. #117

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2']
ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--format documentation
--color
--require spec_helper
--exclude-pattern "spec/rails_app/**/*"
3 changes: 3 additions & 0 deletions .rspec_rails_specs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--format documentation
--color
--exclude-pattern "**/*"
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gemspec
group :development, :test do
gem "rails", '>= 6.1.4'
gem "rspec-rails"
gem "parallel_tests"
gem "pry", platform: :mri, require: false
gem "pry-byebug", platform: :mri, require: false
gem 'rubocop'
Expand Down
28 changes: 23 additions & 5 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'
require 'event_source/boot_registry'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand All @@ -64,14 +65,23 @@ class << self
:async_api_schemas=

def configure
@configured = true
yield(config)
end

def initialize!
load_protocols
create_connections
load_async_api_resources
load_components
def initialize!(force = false)
# Don't boot if I was never configured.
return unless @configured
boot_registry.boot!(force) do
load_protocols
create_connections
load_async_api_resources
load_components
end
end

def boot_registry
@boot_registry ||= EventSource::BootRegistry.new
end

def config
Expand All @@ -89,6 +99,14 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def register_subscriber(subscriber_klass)
boot_registry.register_subscriber(subscriber_klass)
end

def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end
end

class EventSourceLogger
Expand Down
96 changes: 96 additions & 0 deletions lib/event_source/boot_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'set'
require 'monitor'

module EventSource
# This class manages correct/loading of subscribers and publishers
# based on the current stage of the EventSource lifecycle.
#
# Depending on both the time the initialization of EventSource is invoked
# and when subscriber/publisher code is loaded, this can become complicated.
# This is largely caused by two confounding factors:
# 1. We want to delay initialization of EventSource until Rails is fully
# 'ready'
# 2. Based on the Rails environment, such as production, development, or
# test (primarily how those different environments treat lazy vs. eager
# loading of classes in a Rails application), subscriber and publisher
# code can be loaded before, after, or sometimes even DURING the
# EventSource boot process - we need to support all models
class BootRegistry
def initialize
@unbooted_publishers = Set.new
@unbooted_subscribers = Set.new
@booted_publishers = Set.new
@booted_subscribers = Set.new
# This is our re-entrant mutex. We're going to use it to make sure that
# registration and boot methods aren't allowed to simultaneously alter
# our state. You'll notice most methods on this class are wrapped in
# synchronize calls against this.
@bootex = Monitor.new
@booted = false
end

def boot!(force = false)
@bootex.synchronize do
return if @booted && !force
yield
boot_publishers!
boot_subscribers!
@booted = true
end
end

# Register a publisher for EventSource.
#
# If the EventSource hasn't been booted, save publisher for later.
# Otherwise, boot it now.
def register_publisher(publisher_klass)
@bootex.synchronize do
if @booted
publisher_klass.validate
@booted_publishers << publisher_klass
else
@unbooted_publishers << publisher_klass
end
end
end

# Register a subscriber for EventSource.
#
# If the EventSource hasn't been booted, save the subscriber for later.
# Otherwise, boot it now.
def register_subscriber(subscriber_klass)
@bootex.synchronize do
if @booted
subscriber_klass.create_subscription
@booted_subscribers << subscriber_klass
else
@unbooted_subscribers << subscriber_klass
end
end
end

# Boot the publishers.
def boot_publishers!
@bootex.synchronize do
@unbooted_publishers.each do |pk|
pk.validate
@booted_publishers << pk
end
@unbooted_publishers = Set.new
end
end

# Boot the subscribers.
def boot_subscribers!
@bootex.synchronize do
@unbooted_subscribers.each do |sk|
sk.create_subscription
@booted_subscribers << sk
end
@unbooted_subscribers = Set.new
end
end
end
end
2 changes: 1 addition & 1 deletion lib/event_source/protocols/amqp_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require_relative 'amqp/contracts/contract'

Gem
.find_files('event_source/protocols/amqp/contracts/**/*.rb')
.find_files('event_source/protocols/amqp/contracts/**/*.rb', false)
.sort
.each { |f| require(f) }

Expand Down
10 changes: 6 additions & 4 deletions lib/event_source/protocols/http/faraday_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ def actions
# @return [Queue] Queue instance
def subscribe(subscriber_klass, _options)
unique_key = [app_name, formatted_exchange_name].join(delimiter)
logger.debug "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.debug "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
executable = subscriber_klass.executable_for(unique_key)
@subject.actions.push(executable)
subscription_key = [app_name, formatted_exchange_name].join(delimiter)
subscriber_suffix = subscriber_klass.name.downcase.gsub('::', '_')
unique_key = subscription_key + "_#{subscriber_suffix}"
logger.info "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.info "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
@subject.register_action(subscriber_klass, unique_key)
end

def consumer_proxy_for(operation_bindings)
Expand Down
17 changes: 11 additions & 6 deletions lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def self.publisher_container
@publisher_container ||= Concurrent::Map.new
end

def self.initialization_registry
@initialization_registry ||= Concurrent::Array.new
end

def self.initialize_publishers
self.initialization_registry.each do |pub|
pub.validate
end
end

def self.[](exchange_ref)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
Expand All @@ -46,12 +56,7 @@ def included(base)
}
base.extend(ClassMethods)

TracePoint.trace(:end) do |t|
if base == t.self
base.validate
t.disable
end
end
EventSource.register_publisher(base)
end

# methods to register events
Expand Down
16 changes: 13 additions & 3 deletions lib/event_source/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ class Queue
# @attr_reader [Object] queue_proxy the protocol-specific class supporting this DSL
# @attr_reader [String] name
# @attr_reader [Hash] bindings
# @attr_reader [Hash] actions
attr_reader :queue_proxy, :name, :bindings, :actions
attr_reader :queue_proxy, :name, :bindings

def initialize(queue_proxy, name, bindings = {})
@queue_proxy = queue_proxy
@name = name
@bindings = bindings
@subject = ::Queue.new
@actions = []
@registered_actions = []
end

# def subscribe(subscriber_klass, &block)
Expand Down Expand Up @@ -49,5 +48,16 @@ def close
def closed?
@subject.closed?
end

# Register an action to be performed, with a resolver class and key.
def register_action(resolver, key)
@registered_actions << [resolver, key]
end

def actions
@registered_actions.map do |ra|
ra.first.executable_for(ra.last)
end
end
end
end
8 changes: 5 additions & 3 deletions lib/event_source/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

module EventSource
# :nodoc:
class Railtie < Rails::Railtie

module Railtie
Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do
EventSource.initialize!
end
end
end
end
7 changes: 1 addition & 6 deletions lib/event_source/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ def included(base)
base.extend ClassMethods
base.include InstanceMethods

TracePoint.trace(:end) do |t|
if base == t.self
base.create_subscription
t.disable
end
end
EventSource.register_subscriber(base)
end

module InstanceMethods
Expand Down
5 changes: 5 additions & 0 deletions spec/event_source/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def call
# binding.pry
end

before :each do
EventSource.initialize!(true)
end

context '.event' do

let(:organization_params) do
{
hbx_id: '553234',
Expand Down
25 changes: 25 additions & 0 deletions spec/event_source/rails_application_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require "spec_helper"
require "parallel_tests"
require "parallel_tests/rspec/runner"

RSpec.describe EventSource, "rails specs" do
it "runs the rails tests in the rails application context" do
ParallelTests.with_pid_file do
specs_run_result = ParallelTests::RSpec::Runner.run_tests(
[
"spec/rails_app/spec/railtie_spec.rb",
"spec/rails_app/spec/http_service_integration_spec.rb"
],
1,
1,
{
serialize_stdout: true,
test_options: ["-O", ".rspec_rails_specs", "--format", "documentation"]
}
)
if specs_run_result[:exit_status] != 0
fail(specs_run_result[:stdout] + "\n\n")
end
end
end
end
11 changes: 11 additions & 0 deletions spec/rails_app/app/event_source/events/determinations/eval.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module Events
module Determinations
# Eval will register event publisher for MiTC
class Eval < EventSource::Event
publisher_path 'publishers.mitc_publisher'

end
end
end
9 changes: 9 additions & 0 deletions spec/rails_app/app/event_source/publishers/mitc_publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Publishers
class MitcPublisher
# Publisher will send request payload to MiTC for determinations
include ::EventSource::Publisher[http: '/determinations/eval']
register_event '/determinations/eval'
end
end

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module Subscribers
class MitcResponseSubscriber
include ::EventSource::Subscriber[http: '/determinations/eval']
extend EventSource::Logging

subscribe(:on_determinations_eval) do |body, status, headers|
$GLOBAL_TEST_FLAG = true
end
end
end

This file was deleted.

Loading
Loading