Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: call rules during fx quote #361

Draft
wants to merge 1 commit into
base: minor/iso
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/handlers/QuotingHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class QuotingHandler {

try {
span = await this.createSpan(requestData)
await model.handleFxQuoteRequest(headers, payload, span, originalPayload)
await model.handleFxQuoteRequest(headers, payload, span, originalPayload, this.cache)
this.logger.debug('handlePostFxQuotes is done')
} catch (err) {
this.logger.error(`error in handlePostFxQuotes: ${err?.stack}`)
Expand Down
80 changes: 80 additions & 0 deletions src/model/executeRules.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const ErrorHandler = require('@mojaloop/central-services-error-handling')

const rules = require('../../config/rules.json')
const RulesEngine = require('./rules.js')

module.exports.executeRules = async function executeRules (headers, quoteRequest, originalPayload, payer, payee, operation) {
if (rules.length === 0) {
return []
}

const facts = {
operation,
payer,
payee,
payload: quoteRequest,
headers
}

const { events } = await RulesEngine.run(rules, facts)

this.writeLog(`Rules engine returned events ${JSON.stringify(events)}`)

return await this.handleRuleEvents(events, headers, quoteRequest, originalPayload)
}

module.exports.handleRuleEvents = async function handleRuleEvents (events, headers, payload, originalPayload) {
const quoteRequest = originalPayload || payload
// todo: pass only originalPayload (added this logic only for passing tests)

// At the time of writing, all events cause the "normal" flow of execution to be interrupted.
// So we'll return false when there have been no events whatsoever.
if (events.length === 0) {
return { terminate: false, quoteRequest, headers }
}

const { INVALID_QUOTE_REQUEST, INTERCEPT_QUOTE } = RulesEngine.events

const unhandledEvents = events.filter(ev => !(ev.type in RulesEngine.events))

if (unhandledEvents.length > 0) {
// The rules configuration contains events not handled in the code
// TODO: validate supplied rules at startup and fail if any invalid rules are discovered.
throw new Error('Unhandled event returned by rules engine')
}

const invalidQuoteRequestEvents = events.filter(ev => ev.type === INVALID_QUOTE_REQUEST)
if (invalidQuoteRequestEvents.length > 0) {
// Use the first event, ignore the others for now. This is ergonomically worse for someone
// developing against this service, as they can't see all reasons their quote was invalid at
// once. But is a valid solution in the short-term.
const { FSPIOPError: code, message } = invalidQuoteRequestEvents[0].params
// Will throw an internal server error if property doesn't exist
throw ErrorHandler.CreateFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes[code],
message, null, headers['fspiop-source'])
}

const interceptQuoteEvents = events.filter(ev => ev.type === INTERCEPT_QUOTE)
if (interceptQuoteEvents.length > 1) {
// TODO: handle priority. Can we stream events?
throw new Error('Multiple intercept quote events received')
}
if (interceptQuoteEvents.length > 0) {
// send the quote request to the recipient in the event
const result = {
terminate: false,
quoteRequest,
headers: {
...headers,
'fspiop-destination': interceptQuoteEvents[0].params.rerouteToFsp
}
}
// if additionalHeaders are present then add the additional non-standard headers (e.g. used by forex)
// Note these headers are not part of the mojaloop specification
if (interceptQuoteEvents[0].params.additionalHeaders) {
result.headers = { ...result.headers, ...interceptQuoteEvents[0].params.additionalHeaders }
result.additionalHeaders = interceptQuoteEvents[0].params.additionalHeaders
}
return result
}
}
30 changes: 28 additions & 2 deletions src/model/fxQuotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../lib/config')
const { logger } = require('../lib')
const { httpRequest } = require('../lib/http')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, getParticipantEndpoint, calculateRequestHash } = require('../lib/util')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, getParticipantEndpoint, calculateRequestHash, fetchParticipantInfo } = require('../lib/util')
const LOCAL_ENUM = require('../lib/enum')
const { RESOURCES, ERROR_MESSAGES } = require('../constants')
const { executeRules, handleRuleEvents } = require('./executeRules')

axios.defaults.headers.common = {}

Expand All @@ -48,6 +49,10 @@ class FxQuotesModel {
})
}

executeRules = executeRules
handleRuleEvents = handleRuleEvents
_fetchParticipantInfo = fetchParticipantInfo

/**
* Validates the fxQuote request object
*
Expand Down Expand Up @@ -160,7 +165,7 @@ class FxQuotesModel {
*
* @returns {undefined}
*/
async handleFxQuoteRequest (headers, fxQuoteRequest, span, originalPayload = fxQuoteRequest) {
async handleFxQuoteRequest (headers, fxQuoteRequest, span, originalPayload = fxQuoteRequest, cache) {
// todo: remove default value for originalPayload (added just for passing tests)
const histTimer = Metrics.getHistogram(
'model_fxquote',
Expand Down Expand Up @@ -227,6 +232,17 @@ class FxQuotesModel {
await txn.commit()
}

const { payer, payee } = await this._fetchParticipantInfo(fspiopSource, fspiopDestination, cache)
this.writeLog(`Got payer ${payer} and payee ${payee}`)

// Run the rules engine. If the user does not want to run the rules engine, they need only to
// supply a rules file containing an empty array.
const handledRuleEvents = await this.executeRules(headers, fxQuoteRequest, originalPayload, payer, payee, 'fxQuoteRequest')

if (handledRuleEvents.terminate) {
return
}

await this.forwardFxQuoteRequest(headers, fxQuoteRequest.conversionRequestId, originalPayload, childSpan)
histTimer({ success: true, queryName: 'handleFxQuoteRequest' })
} catch (err) {
Expand Down Expand Up @@ -810,6 +826,16 @@ class FxQuotesModel {
opts.headers['fspiop-signature'] = jwsSigner.getSignature(opts)
}
}

/**
* Writes a formatted message to the console
*
* @returns {undefined}
*/
// eslint-disable-next-line no-unused-vars
writeLog (message) {
Logger.isDebugEnabled && Logger.debug(`(${this.requestId}) [quotesmodel]: ${message}`)
}
}

module.exports = FxQuotesModel
80 changes: 5 additions & 75 deletions src/model/quotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ const { httpRequest } = require('../lib/http')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, calculateRequestHash, fetchParticipantInfo, getParticipantEndpoint } = require('../lib/util')
const { RESOURCES } = require('../constants')
const LOCAL_ENUM = require('../lib/enum')
const rules = require('../../config/rules.json')
const RulesEngine = require('./rules.js')
const { executeRules, handleRuleEvents } = require('./executeRules')

delete axios.defaults.headers.common.Accept
delete axios.defaults.headers.common['Content-Type']
Expand All @@ -75,79 +74,11 @@ class QuotesModel {
})
}

async executeRules (headers, quoteRequest, payer, payee) {
if (rules.length === 0) {
return []
}

const facts = {
payer,
payee,
payload: quoteRequest,
headers
}

const { events } = await RulesEngine.run(rules, facts)
this.log.debug('Rules engine returned events:', { events })

return events
executeRules () {
return executeRules.apply(this, arguments)
}

async handleRuleEvents (events, headers, payload, originalPayload) {
const quoteRequest = originalPayload || payload
// todo: pass only originalPayload (added this logic only for passing tests)

// At the time of writing, all events cause the "normal" flow of execution to be interrupted.
// So we'll return false when there have been no events whatsoever.
if (events.length === 0) {
return { terminate: false, quoteRequest, headers }
}

const { INVALID_QUOTE_REQUEST, INTERCEPT_QUOTE } = RulesEngine.events

const unhandledEvents = events.filter(ev => !(ev.type in RulesEngine.events))

if (unhandledEvents.length > 0) {
// The rules configuration contains events not handled in the code
// TODO: validate supplied rules at startup and fail if any invalid rules are discovered.
throw new Error('Unhandled event returned by rules engine')
}

const invalidQuoteRequestEvents = events.filter(ev => ev.type === INVALID_QUOTE_REQUEST)
if (invalidQuoteRequestEvents.length > 0) {
// Use the first event, ignore the others for now. This is ergonomically worse for someone
// developing against this service, as they can't see all reasons their quote was invalid at
// once. But is a valid solution in the short-term.
const { FSPIOPError: code, message } = invalidQuoteRequestEvents[0].params
// Will throw an internal server error if property doesn't exist
throw ErrorHandler.CreateFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes[code],
message, null, headers['fspiop-source'])
}

const interceptQuoteEvents = events.filter(ev => ev.type === INTERCEPT_QUOTE)
if (interceptQuoteEvents.length > 1) {
// TODO: handle priority. Can we stream events?
throw new Error('Multiple intercept quote events received')
}
if (interceptQuoteEvents.length > 0) {
// send the quote request to the recipient in the event
const result = {
terminate: false,
quoteRequest,
headers: {
...headers,
'fspiop-destination': interceptQuoteEvents[0].params.rerouteToFsp
}
}
// if additionalHeaders are present then add the additional non-standard headers (e.g. used by forex)
// Note these headers are not part of the mojaloop specification
if (interceptQuoteEvents[0].params.additionalHeaders) {
result.headers = { ...result.headers, ...interceptQuoteEvents[0].params.additionalHeaders }
result.additionalHeaders = interceptQuoteEvents[0].params.additionalHeaders
}
return result
}
}
handleRuleEvents = handleRuleEvents

/**
* Validates the quote request object
Expand Down Expand Up @@ -281,9 +212,8 @@ class QuotesModel {

// Run the rules engine. If the user does not want to run the rules engine, they need only to
// supply a rules file containing an empty array.
const events = await this.executeRules(headers, quoteRequest, payer, payee)
handledRuleEvents = await this.executeRules(headers, quoteRequest, originalPayload, payer, payee, 'quoteRequest')

handledRuleEvents = await this.handleRuleEvents(events, headers, quoteRequest, originalPayload)
if (handledRuleEvents.terminate) {
return
}
Expand Down
3 changes: 3 additions & 0 deletions src/model/rules.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const createEngine = () => {
}
}

engine.addOperator('truthy', (factValue, ruleValue) => {
return !!factValue === ruleValue
})
engine.addOperator('notDeepEqual', (factValue, ruleValue) => {
return !deepEqual(factValue, ruleValue)
})
Expand Down
3 changes: 3 additions & 0 deletions test/unit/model/fxQuotes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ describe('FxQuotesModel Tests -->', () => {
describe('handleFxQuoteRequest', () => {
test('should handle fx quote request', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
jest.spyOn(fxQuotesModel, 'forwardFxQuoteRequest').mockResolvedValue()
jest.spyOn(fxQuotesModel, 'validateFxQuoteRequest')

Expand Down Expand Up @@ -175,6 +176,7 @@ describe('FxQuotesModel Tests -->', () => {

test('should handle fx quote request in persistent mode', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
fxQuotesModel.envConfig.simpleRoutingMode = false

jest.spyOn(fxQuotesModel, 'checkDuplicateFxQuoteRequest').mockResolvedValue({
Expand Down Expand Up @@ -215,6 +217,7 @@ describe('FxQuotesModel Tests -->', () => {

test('should handle error thrown', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
jest.spyOn(fxQuotesModel, 'forwardFxQuoteRequest').mockRejectedValue(new Error('Forward Error'))
jest.spyOn(fxQuotesModel, 'validateFxQuoteRequest')
jest.spyOn(fxQuotesModel, 'handleException').mockResolvedValue()
Expand Down
14 changes: 11 additions & 3 deletions test/unit/model/quotes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,8 @@ describe('QuotesModel', () => {
const payer = { accounts: [{ accountId: 1, ledgerAccountType: 'POSITION', isActive: 1 }] }
const payee = { accounts: [{ accountId: 2, ledgerAccountType: 'POSITION', isActive: 1 }] }

await expect(quotesModel.executeRules(mockData.headers, mockData.quoteRequest, payer, payee))
.resolves
.toEqual(expectedEvents)
await quotesModel.executeRules(mockData.headers, mockData.quoteRequest, payer, payee)
expect(quotesModel.handleRuleEvents).toHaveBeenCalledWith(expectedEvents, expect.anything(), expect.anything(), expect.anything())
})
})
})
Expand Down Expand Up @@ -718,6 +717,10 @@ describe('QuotesModel', () => {

describe('Failures:', () => {
describe('Before forwarding the request:', () => {
beforeEach(() => {
quotesModel.executeRules.mockRestore()
})

it('throws an exception if `executeRules` fails', async () => {
expect.assertions(1)

Expand Down Expand Up @@ -1012,6 +1015,7 @@ describe('QuotesModel', () => {
describe('In case environment is configured for simple routing mode', () => {
beforeEach(() => {
mockConfig.simpleRoutingMode = true
quotesModel.executeRules.mockRestore()
})

it('calls `handleException` with the proper arguments if `span.audit` fails', async () => {
Expand Down Expand Up @@ -1058,6 +1062,7 @@ describe('QuotesModel', () => {

beforeEach(() => {
mockConfig.simpleRoutingMode = false
quotesModel.executeRules.mockRestore()

expectedResult = {
amountTypeId: mockData.amountTypeId,
Expand Down Expand Up @@ -1138,6 +1143,7 @@ describe('QuotesModel', () => {
}
}))

quotesModel.executeRules.mockRestore()
const result = await quotesModel.handleQuoteRequest(mockData.headers, mockData.quoteRequest, mockSpan)

expect(quotesModel.db.createQuoteDuplicateCheck.mock.calls.length).toBe(0)
Expand All @@ -1149,6 +1155,7 @@ describe('QuotesModel', () => {
describe('In case environment is configured for simple routing mode', () => {
beforeEach(() => {
mockConfig.simpleRoutingMode = true
quotesModel.executeRules.mockRestore()
})

it('forwards the quote request properly', async () => {
Expand All @@ -1174,6 +1181,7 @@ describe('QuotesModel', () => {

beforeEach(() => {
mockConfig.simpleRoutingMode = false
quotesModel.executeRules.mockRestore()

expectedResult = {
amountTypeId: mockData.amountTypeId,
Expand Down