Skip to content

Commit

Permalink
build async api message object with session
Browse files Browse the repository at this point in the history
  • Loading branch information
raghuramg committed Dec 21, 2023
1 parent 669a93a commit 06d8409
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 186 deletions.
1 change: 1 addition & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
require 'event_source/subscriber'
require 'event_source/operations/codec64'
require 'event_source/operations/create_message'
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'

Expand Down
1 change: 1 addition & 0 deletions lib/event_source/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ class Error < StandardError
ConnectionNotFound = Class.new(Error)
ServerConfigurationNotFound = Class.new(Error)
ServerConfigurationInvalid = Class.new(Error)
MessageBuildError = Class.new(Error)
end
end
7 changes: 1 addition & 6 deletions lib/event_source/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def initialize(options = {})
send(:headers=, options[:headers] || {})

@publisher_path = klass_var_for(:publisher_path) || nil
build_message(options)
build_message(options) if headers.delete(:build_message)

if @publisher_path.eql?(nil)
raise EventSource::Error::PublisherKeyMissing,
Expand All @@ -40,11 +40,6 @@ def build_message(options)
payload: options[:attributes],
name: name
)


# result = EventSource::Operations::BuildMessage.new.call(options)
# raise EventSource::Error::MessageBuildError, result.failure unless result.success?
# @message = result.success if result.success?
end

# Set payload
Expand Down
14 changes: 10 additions & 4 deletions lib/event_source/message.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

require 'forwardable'
require "forwardable"

module EventSource
# Generate and forwared a notification that something has happened in the system
# Construct async api message object
class Message
extend Forwardable

Expand All @@ -18,13 +18,19 @@ def initialize(options = {})

def build_message(options)
result = EventSource::Operations::BuildMessageOptions.new.call(options)
raise "unable to build message options due to #{result.failure}" unless result.success?
unless result.success?
raise EventSource::Error::MessageBuildError,
"unable to build message options due to #{result.failure}"
end
result.success
end

def create_message(options)
result = EventSource::Operations::CreateMessage.new.call(options)
raise "unable to create message due to #{result.failure}" unless result.success?
unless result.success?
raise EventSource::Error::MessageBuildError,
"unable to create message due to #{result.failure}"
end
result.success
end
end
Expand Down
13 changes: 4 additions & 9 deletions lib/event_source/operations/build_message.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'
require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
Expand All @@ -10,15 +10,15 @@ class BuildMessage
include Dry::Monads[:result, :do]

def call(params)
values = yield build_message(params)
values = yield build_options(params)
message = yield create_message(values)

Success(message)
end

private

def build_message(params)
def build_options(params)
result = BuildMessageOptions.new.call(params)
result.success? ? result : Failure(result.errors.to_h)
end
Expand All @@ -29,8 +29,3 @@ def create_message(values)
end
end
end





88 changes: 29 additions & 59 deletions lib/event_source/operations/build_message_options.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'
require 'securerandom'
require "dry/monads"
require "dry/monads/do"
require "securerandom"

module EventSource
module Operations
Expand All @@ -11,78 +11,48 @@ class BuildMessageOptions
include Dry::Monads[:result, :do]

def call(params)
input_options = yield build_options(params)
#
# build_headers
# build_payload
message_options = yield fetch_session_options(input_options)
headers = yield build_headers(params)
payload = yield build_payload(params)
payload = yield append_session_details(payload)

Success(message_options)
Success(headers: headers, payload: payload)
end

private

def build_options(params)
message_attributes = {
payload: params[:attributes]&.symbolize_keys || {},
headers: params[:headers]&.symbolize_keys || {}
}
def build_headers(params)
headers = params[:headers]&.symbolize_keys || {}
headers[:correlation_id] ||= SecureRandom.uuid

message_attributes[:payload][:message_id] ||= SecureRandom.uuid
message_attributes[:headers][:correlation_id] ||= SecureRandom.uuid
Success(headers)
end

def build_payload(params)
payload = params[:payload]&.symbolize_keys || {}
payload[:message_id] ||= SecureRandom.uuid
payload[:event_name] ||= params[:name]

Success(message_attributes)
Success(payload)
end

# TODO: need to be an operaton
# keycloak enroll branch verify for session access outside of controller
# ActionController::Base.helpers
def fetch_session_options(message_attributes)
include_session_concern_if_defined
def append_session_details(payload)
output = FetchSession.new.call

if session_defined? && current_user_defined?
message_attributes[:payload].merge!(
session_details: session,
if output.success?
session, current_user = output.value!
payload.merge!(
session_details: session&.symbolize_keys,
account_id: current_user.id
)
else
message_attributes[:payload].merge!(
account_id: system_account&.id
) if defined?(system_account)
# Create system account user <admin@dc.gov> when session is not available
if defined?(system_account)
payload.merge!(account_id: system_account&.id)
end
end

# Create system account user <admin@dc.gov> when session is not available
Success(message_attributes)
end

def include_session_concern_if_defined
self.class.include(::SessionConcern) if defined?(::SessionConcern)
end

def session_defined?
defined?(session)
end

def current_user_defined?
defined?(current_user)
Success(payload)
end
end
end
end

# belongs_to :account # alex (a.k.a Account) (System User) <coming from devise current_user/current_account>
# subject: david's consumer_role <ConsumerRole/<BsonID> GLOBAL_ID of consumer_role
# subject_identifier: Person ID/Organization ID
# category: 'HC4CC eligibility created'
# event_time: 'time when event occurred'
# ttl <time to live nice to have>
# correlation_id: <guid> <verify, medicaid_gateway may not using this correct way>
# flag hbx_id being used as a correlation ID for correction.

# message_id:
# host_id: <pod id, kubernetes identifiers for devops to locate message origin>
# devise_session_detail:
# session_id:
# portal:
# last_request_at:
# session_user_id:
12 changes: 7 additions & 5 deletions lib/event_source/operations/create_message.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'
require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
Expand All @@ -10,16 +10,18 @@ class CreateMessage
include Dry::Monads[:result, :do]

def call(params)
values = yield build(params)
values = yield build(params)
message = yield create(values)

Success(message)
end

private

def build(params)
result = ::EventSource::AsyncApi::Contracts::MessageContract.new.call(params)
result =
::EventSource::AsyncApi::Contracts::MessageContract.new.call(params)

result.success? ? Success(result.to_h) : Failure(result.errors.to_h)
end

Expand Down
52 changes: 52 additions & 0 deletions lib/event_source/operations/fetch_session.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
# fetch session
class FetchSession
include Dry::Monads[:result, :do]

def call
helper = yield include_session_helper
session = yield fetch_session
current_user = yield fetch_current_user

Success([session, current_user])
end

private

def include_session_helper
if session_concern_defined?
self.class.include(::SessionConcern)
Success(::SessionConcern)
else
Failure("SessionConcern is not defined")
end
end

def fetch_session
if respond_to?(:session)
Success(session)
else
Failure("session is not defined")
end
end

def fetch_current_user
if respond_to?(:current_user)
Success(current_user)
else
Failure("current_user is not defined")
end
end

def session_concern_defined?
Object.const_defined?(:SessionConcern)
end
end
end
end
5 changes: 4 additions & 1 deletion lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def publish(event)

logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}"
publish_operation = find_publish_operation_for(publish_operation_name)
publish_operation.call(event.payload, {headers: event.headers})
payload = event.message&.payload || event.payload
headers = event.message&.headers || event.headers

publish_operation.call(payload, {headers: headers})
end

def channel_name
Expand Down
Loading

0 comments on commit 06d8409

Please sign in to comment.