From 52bb3fbc9dea9e52fb05bbfd694e864f9e624dc5 Mon Sep 17 00:00:00 2001 From: Mark Holtzhausen Date: Mon, 19 Oct 2020 14:22:53 +0200 Subject: [PATCH] PLAT-1037 leaky bucket (#12) Some refactor Packages updated and version-pinned More aggressive management of RabbitFallback --- .editorconfig | 2 +- .eslintrc | 20 ---- .eslintrc.json | 36 +++++++ .gitignore | 4 +- Dockerfile | 4 +- Dockerfile-alpine | 4 +- changelog.md | 5 + package.json | 27 +++-- src/app.js | 222 ++++++++++++++++++---------------------- src/bin/www | 91 ++++++++-------- src/config.js | 38 ++++--- src/lib/ServiceError.js | 8 ++ src/lib/logger.js | 13 +++ src/lib/prometheus.js | 51 +++++++++ src/lib/pubsub.js | 69 +++++++++++++ src/lib/queue.js | 66 ++++++++++++ src/lib/rabbit.js | 67 ++++++++++++ src/lib/validator.js | 82 +++++++++++++++ src/listen.js | 15 --- src/logger.js | 13 --- src/prometheus.js | 48 --------- src/pubsub.js | 67 ------------ src/queue.js | 18 ---- src/rabbit.js | 44 -------- src/validator.js | 81 --------------- 25 files changed, 583 insertions(+), 512 deletions(-) delete mode 100644 .eslintrc create mode 100644 .eslintrc.json create mode 100644 src/lib/ServiceError.js create mode 100644 src/lib/logger.js create mode 100644 src/lib/prometheus.js create mode 100644 src/lib/pubsub.js create mode 100644 src/lib/queue.js create mode 100644 src/lib/rabbit.js create mode 100644 src/lib/validator.js delete mode 100644 src/listen.js delete mode 100644 src/logger.js delete mode 100644 src/prometheus.js delete mode 100644 src/pubsub.js delete mode 100644 src/queue.js delete mode 100644 src/rabbit.js delete mode 100644 src/validator.js diff --git a/.editorconfig b/.editorconfig index 5d12634..e9a9bff 100644 --- a/.editorconfig +++ b/.editorconfig @@ -3,7 +3,7 @@ root = true [*] indent_style = space -indent_size = 2 +indent_size = 4 end_of_line = lf charset = utf-8 trim_trailing_whitespace = true diff --git a/.eslintrc b/.eslintrc deleted file mode 100644 index 8023151..0000000 --- a/.eslintrc +++ /dev/null @@ -1,20 +0,0 @@ -{ - "extends": "google", - "env": { - "node": true, - "es6": true - }, - "parserOptions": { - "ecmaVersion": 2017, - "sourceType": "module" - }, - "rules": { - "max-len": ["error", { - "ignoreComments": true, - "ignoreUrls": true, - "ignoreStrings": true, - "ignoreRegExpLiterals": true, - "code": 120 - }] - } -} diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 0000000..2ce6f67 --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,36 @@ +{ + "root": true, + "env": { + "es6": true, + "node": true +}, + "extends": [ + "eslint:recommended" + ], + "parserOptions": { + "ecmaVersion": 8 + }, + "rules": { + "quote-props": ["warn", "consistent-as-needed"], + "arrow-parens": 0, + "generator-star-spacing": 0, + "no-debugger": 0, + "array-element-newline": ["error", "consistent"], + "array-bracket-newline": ["error", "consistent"], + "array-bracket-spacing": ["error", "never"], + "no-console": "error", + "indent": ["error", 4, { "SwitchCase": 1 }], + "space-infix-ops": ["warn"], + "eol-last":["warn"], + "no-multi-spaces": ["warn", { "ignoreEOLComments": true }], + "no-multiple-empty-lines": ["error", { "max": 2, "maxEOF": 1 }], + "comma-dangle": ["warn", "never"], + "eqeqeq": ["warn"], + "semi": ["warn", "never"], + "keyword-spacing": ["warn"], + "space-before-blocks": ["warn"], + "comma-spacing": ["warn", { "before": false, "after": true }] + } +} + + diff --git a/.gitignore b/.gitignore index c1cbef1..6a55d0f 100644 --- a/.gitignore +++ b/.gitignore @@ -62,8 +62,10 @@ typings/ # dotenv environment variables file .env +# vscode editor settings +.vscode/ + # End of https://www.gitignore.io/api/node .idea -lib diff --git a/Dockerfile b/Dockerfile index 876cc0a..88ef724 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,11 @@ -FROM node:7.9.0 +FROM node:12.18.3 MAINTAINER Superbalist RUN mkdir -p /usr/src/app WORKDIR /usr/src/app COPY package.json /usr/src/app/ -RUN yarn install +RUN npm install --only=prod COPY src /usr/src/app/src/ diff --git a/Dockerfile-alpine b/Dockerfile-alpine index 79faed3..7ad8437 100644 --- a/Dockerfile-alpine +++ b/Dockerfile-alpine @@ -1,11 +1,11 @@ -FROM node:7.9.0-alpine +FROM node:12.18.3-alpine MAINTAINER Superbalist RUN mkdir -p /usr/src/app WORKDIR /usr/src/app COPY package.json /usr/src/app/ -RUN yarn install +RUN npm install --only=prod COPY src /usr/src/app/src/ diff --git a/changelog.md b/changelog.md index b51d8ea..1f5fc18 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,10 @@ # Changelog +## 1.3.0 2020-09-15 +* Badly formed channel messages result in `400` error +* When this service is shutting down, it won't be receiving any more messages +* When a message fails with `503`, the caller should retry + ## 1.2.0 2018-12-21 * Fallback to rabbitmq if messages fail 3 times. diff --git a/package.json b/package.json index b1cdb5c..8a998dc 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { "name": "@superbalist/js-pubsub-rest-proxy", - "version": "1.2.0", + "version": "1.3.0", "description": "An HTTP server which acts as a gateway for publishing messages via a js-pubsub adapter", "private": true, "scripts": { "start": "node ./src/bin/www", - "start-dev": "nodemon -L ./src/bin/www" + "start-dev": "nodemon -L ./src/bin/www -- --trace-sync-io" }, "repository": { "type": "git", @@ -24,20 +24,19 @@ "dependencies": { "@superbalist/js-event-pubsub": "^3.0.2", "@superbalist/js-pubsub-manager": "^3.0.1", - "ajv": "^5.0.1", - "amqplib": "^0.5.3", - "body-parser": "^1.17.2", - "debug": "~2.6.3", - "express": "^4.15.3", - "morgan": "^1.8.1", - "prom-client": "^10.2.2", - "queue": "^4.2.1", + "ajv": "5.0.1", + "amqplib": "0.6.0", + "body-parser": "1.19.0", + "debug": "4.1.1", + "express": "4.17.1", + "morgan": "1.10.0", + "prom-client": "12.0.0", + "queue": "6.0.1", "raven": "^2.0.1", - "winston": "^2.3.1" + "winston": "3.3.3" }, "devDependencies": { - "eslint": "^6.8.0", - "eslint-config-google": "^0.7.1", - "nodemon": "^1.18.7" + "eslint": "7.9.0", + "nodemon": "2.0.4" } } diff --git a/src/app.js b/src/app.js index e507207..98079e7 100644 --- a/src/app.js +++ b/src/app.js @@ -1,133 +1,107 @@ -'use strict'; - -let config = require('./config'); -let express = require('express'); -let morgan = require('morgan'); -let bodyParser = require('body-parser'); -let Raven = require('raven'); -if (config.SENTRY_DSN) { - Raven.config(config.SENTRY_DSN) - .install(); +'use strict' + +// get configuration +const { + SENTRY_DSN, + MAX_POST_SIZE, + FALLBACK, + RABBIT +} = require('./config') + +let serviceIsAvailable = false +const express = require('express') +const morgan = require('morgan') +const bodyParser = require('body-parser') +const Raven = require('raven') + +//TODO Upgrade to sentry pls -- this is deprecated +if (SENTRY_DSN) { + Raven.config(SENTRY_DSN) + .install() } -if (config.FALLBACK) { - // Use var intentionally so that rabbit doesn't need to be defined - var rabbit = require('./rabbit'); // eslint-disable-line no-var -} +const prom = require('./lib/prometheus') +const logger = require('./lib/logger') +const queue = require('./lib/queue') +const ServiceError = require('./lib/ServiceError') +const rabbitController = require('./lib/rabbit') -let prom = require('./prometheus'); - -let logger = require('./logger'); -let pubsub = require('./pubsub'); -let queue = require('./queue'); - -// bootstrap app -let app = express(); -app.use(morgan('dev')); -app.use(bodyParser.json({ limit: config.MAX_POST_SIZE })); -app.use(bodyParser.urlencoded({extended: false})); - -// register routes -app.get('/', (req, res, next) => { - res.json({ping: 'pong'}); -}); - -app.get('/healthz', (req, res, next) => { - res.json({ping: 'pong'}); -}); - -app.post('/messages/:channel', (req, res, next) => { - let end = prom.requestSummary.startTimer(); - let channel = req.params.channel; - prom.receiveCount.inc({channel}); - let messages = req.body.messages || []; - queue.push(publishJob(channel, messages, end)); - res.json({success: true}); -}); - - - /** - * Return a job - * - * @param {String} channel - * @param {[Object]} messages - * @param {Function} end - * @param {Integer} retries - * - * @return {Function} - */ -function publishJob(channel, messages, end, retries=0) { - return function(cb) { - pubsub.publish(channel, messages).then((result)=>{ - let errors = result.errors; - if(errors.length > 0) { - prom.publishCount.inc({state: 'failed', channel}); - if(config.FALLBACK && retries >= 2) { - end(); - return rabbit.publish(channel, errors); - } - retries++; - queue.push(publishJob(channel, errors.map((error) => error.message), end, retries)); - } else { - prom.publishCount.inc({state: 'success', channel}); - end(); - } - cb(); - }).catch((error) => { - cb(error); - }); - }; -} +if (FALLBACK) rabbitController.start() + +const app = express() + +app.use(morgan('dev')) +app.use(bodyParser.json({ limit: MAX_POST_SIZE })) +app.use(bodyParser.urlencoded({ extended: false })) + +// why is this here? :shrug: +app.get('/', async (req, res) => { + // logger.info('ping') + res.json({ ping: 'pong' }).end() +}) + +// k8s health check endpoint +app.get('/healthz', (req, res) => { + if (!serviceIsAvailable) throw new ServiceError('This service is not currently accepting requests', 400) + res.json({ ping: 'pong' }) +} ) -// bind middleware -app.use((req, res, next) => { - let err = new Error('Not Found'); - err.status = 404; - next(err); -}); +// handle incoming message post requests +app.post('/messages/:channel', async (req, res) => { + const end = prom.requestSummary.startTimer() + const channel = req.params.channel + const messages = req.body.messages + // Early exit if the format is wrong + if (!Array.isArray(messages)) throw new ServiceError('`messages` property is expected to be an array', 400) + + // Count this channel add + prom.receiveCount.inc({ channel }) + queue.addPublishJob(channel, messages, end) + res.json({ success: true }).end() +}) + + +// Anything requests that have not been dealt with by this point is 404 +app.use((req, res, next) => next(new ServiceError('Not Found', 404))) + +// Express Error Handler +// eslint-disable-next-line no-unused-vars app.use((err, req, res, next) => { - // set locals, only providing error in development - res.locals.message = err.message; - res.locals.error = req.app.get('env') === 'development' ? err : {}; - - // return a json error response - let status = err.status || 500; - res.status(status); - res.json({ - status: status, - message: err.message, - }); -}); - -let onExitHandler = () => { - logger.info('Preparing to shutdown application'); - - logger.info('Stopping queue timer'); - clearInterval(queue.timer); - - logger.info('Closing express server socket'); - // When the HTTP server closes we want to empty the job queue. - app.server.close(emptyQueue); -}; - - -/** - * Run the queue and exit if it is empty - */ -function emptyQueue() { - if(queue.length > 0) { - logger.info('Processing remaining jobs on queue'); - queue.start(emptyQueue); - } + // set locals, only providing error in development + res.locals.message = err.message + res.locals.error = req.app.get('env') === 'development' ? err : {} + + // return a json error response + let status = err.status || 500 + res.status(status) + res.json({ + status, + message: err.message + }) +}) + +// Graceful shutdown +let exitHandler = async () => { + serviceIsAvailable = false // Health endpoint will start reporting failure + + logger.info('Shutdown Initiated.') + + logger.info('Express server has shut down.') + + Promise.all([ + queue.end(), //Empty queue + rabbitController.stop(RABBIT.SHUTDOWN_WAIT) //Wait for rabbit connection to close + ]).then(()=>{ + logger.info('PSRP Terminating.') + + }).catch(()=>{}) } -process.on('SIGTERM', () => { - onExitHandler(); -}); - -process.on('SIGINT', () => { - onExitHandler(); -}); +// Make sure we exit cleanly +process.on('SIGTERM', exitHandler) // regular termination signal +process.on('SIGINT', exitHandler) // for ^C +// process.on('SIGUSR2', exitHandler) // for nodemon during dev -module.exports = app; +serviceIsAvailable = true // Health endpoint will start reporting success +module.exports = app diff --git a/src/bin/www b/src/bin/www index 6b26e97..19ec2c0 100644 --- a/src/bin/www +++ b/src/bin/www @@ -1,55 +1,56 @@ #!/usr/bin/env node +/* eslint-disable no-console */ -'use strict'; +'use strict' /** * Module dependencies. */ -let app = require('../app'); -let debug = require('debug')('test:server'); -let http = require('http'); +let app = require('../app') +let debug = require('debug')('test:server') +let http = require('http') /** * Get port from environment and store in Express. */ -let port = normalizePort(process.env.PORT || '3000'); -app.set('port', port); +let port = normalizePort(process.env.PORT || '3000') +app.set('port', port) /** * Create HTTP server. */ -let server = http.createServer(app); -app.server = server; +let server = http.createServer(app) +app.server = server /** * Listen on provided port, on all network interfaces. */ -server.listen(port); -server.on('error', onError); -server.on('listening', onListening); +server.listen(port) +server.on('error', onError) +server.on('listening', onListening) /** * Normalize a port into a number, string, or false. */ function normalizePort(val) { - let port = parseInt(val, 10); + let port = parseInt(val, 10) - if (isNaN(port)) { + if (isNaN(port)) { // named pipe - return val; - } + return val + } - if (port >= 0) { + if (port >= 0) { // port number - return port; - } + return port + } - return false; + return false } /** @@ -57,27 +58,27 @@ function normalizePort(val) { */ function onError(error) { - if (error.syscall !== 'listen') { - throw error; - } - - let bind = typeof port === 'string' - ? 'Pipe ' + port - : 'Port ' + port; - - // handle specific listen errors with friendly messages - switch (error.code) { - case 'EACCES': - console.error(bind + ' requires elevated privileges'); - process.exit(1); - break; - case 'EADDRINUSE': - console.error(bind + ' is already in use'); - process.exit(1); - break; - default: - throw error; - } + if (error.syscall !== 'listen') { + throw error + } + + let bind = typeof port === 'string' + ? 'Pipe ' + port + : 'Port ' + port + + // handle specific listen errors with friendly messages + switch (error.code) { + case 'EACCES': + console.error(bind + ' requires elevated privileges') + process.exit(1) + break + case 'EADDRINUSE': + console.error(bind + ' is already in use') + process.exit(1) + break + default: + throw error + } } /** @@ -85,9 +86,9 @@ function onError(error) { */ function onListening() { - let addr = server.address(); - let bind = typeof addr === 'string' - ? 'pipe ' + addr - : 'port ' + addr.port; - debug('Listening on ' + bind); + let addr = server.address() + let bind = typeof addr === 'string' + ? 'pipe ' + addr + : 'port ' + addr.port + debug('Listening on ' + bind) } diff --git a/src/config.js b/src/config.js index e89d67d..d5602f3 100644 --- a/src/config.js +++ b/src/config.js @@ -1,20 +1,24 @@ -'use strict'; +'use strict' const config = { - LOG_LEVEL: process.env.LOG_LEVEL || 'info', - SENTRY_DSN: process.env.SENTRY_DSN, - MAX_POST_SIZE: process.env.MAX_POST_SIZE || '10mb', - PROMETHEUS_EXPORTER: process.env.PROMETHEUS_EXPORTER_ENABLED || false, - PROMETHEUS_PORT: process.env.PROMETHEUS_EXPORTER_PORT || 5000, - VALIDATION_ERROR_CHANNEL: process.env.VALIDATION_ERROR_CHANNEL || 'validation_error', - VALIDATION_ERROR_SCHEMA_URL: process.env.VALIDATION_ERROR_SCHEMA_URL || false, - PUBLISH_INVALID: !(process.env.PUBLISH_INVALID == 'false'), - RABBIT: { - HOST: process.env.RABBITMQ_URL || 'pubsub-rest-proxy-rabbitmq', - USER: process.env.RABBITMQ_USER || 'guest', - PASSWORD: process.env.RABBITMQ_PASSWORD || 'Z3Vlc3Q=', - }, - FALLBACK: process.env.RABBITMQ_FALLBACK || false, -}; + LOG_LEVEL: process.env.LOG_LEVEL || 'info', + SENTRY_DSN: process.env.SENTRY_DSN, + MAX_POST_SIZE: process.env.MAX_POST_SIZE || '10mb', + PROMETHEUS_EXPORTER: process.env.PROMETHEUS_EXPORTER_ENABLED || false, + PROMETHEUS_PORT: process.env.PROMETHEUS_EXPORTER_PORT || 5000, + VALIDATION_ERROR_CHANNEL: process.env.VALIDATION_ERROR_CHANNEL || 'validation_error', + VALIDATION_ERROR_SCHEMA_URL: process.env.VALIDATION_ERROR_SCHEMA_URL || false, + PUBLISH_INVALID: process.env.PUBLISH_INVALID === 'false' ? false : true, + RABBIT: { + HOST: process.env.RABBITMQ_URL || 'pubsub-rest-proxy-rabbitmq', + USER: process.env.RABBITMQ_USER || 'guest', + PASSWORD: process.env.RABBITMQ_PASSWORD || 'Z3Vlc3Q=', + SHUTDOWN_WAIT: 100 // milliseconds to wait before shutting down rabbit. Just a little space to ensure rabbit exits cleanly + }, + FALLBACK: process.env.RABBITMQ_FALLBACK === 'false' ? false : true, + QUEUE_CONCURRENCY: 3, + QUEUE_RE_ADD_JOB_TIMEOUT: 2000, // how long to wait before re-adding an errored job + QUEUE_RESTART_TIME: 100 // once a queue is done, it won't react to new jobs. This is how frequently it is checked and restarted +} -module.exports = config; +module.exports = config diff --git a/src/lib/ServiceError.js b/src/lib/ServiceError.js new file mode 100644 index 0000000..639db00 --- /dev/null +++ b/src/lib/ServiceError.js @@ -0,0 +1,8 @@ +class ServiceError extends Error { + constructor(message, status = 500) { + super(message) + this.status = status + } +} + +module.exports = ServiceError diff --git a/src/lib/logger.js b/src/lib/logger.js new file mode 100644 index 0000000..8f02f61 --- /dev/null +++ b/src/lib/logger.js @@ -0,0 +1,13 @@ +'use strict' + +const winston = require('winston') +const {LOG_LEVEL} = require('../config') + +const logger = winston.createLogger({ + transports: [ + new (winston.transports.Console)({timestamp: true}) + ] +}) +logger.level = LOG_LEVEL + +module.exports = logger diff --git a/src/lib/prometheus.js b/src/lib/prometheus.js new file mode 100644 index 0000000..7884b0e --- /dev/null +++ b/src/lib/prometheus.js @@ -0,0 +1,51 @@ +'use strict' + +const client = require('prom-client') +const express = require('express') +const logger = require('./logger') +const {PROMETHEUS_PORT, PROMETHEUS_EXPORTER} = require('../config') + +client.collectDefaultMetrics({timeout: 30000}) + +const receiveCount = new client.Counter({ + name: 'pubsub_rest_proxy_receive_total', + help: 'Count of all messages recieved by pubsub-rest-proxy', + labelNames: ['channel'] +}) + +const publishCount = new client.Counter({ + name: 'pubsub_rest_proxy_publish_total', + help: 'Count of all messages published by pubsub-rest-proxy', + labelNames: ['state', 'channel'] +}) + +const requestSummary = new client.Summary({ + name: 'pubsub_rest_proxy_receive_to_publish_percentile', + help: 'Percentiles of time between receive and publish', + maxAgeSeconds: 600, + ageBuckets: 5 +}) + +const messageCount = new client.Counter({ + name: 'pubsub_rest_proxy_message_total', + help: 'Count of all individual messages published by pubsub-rest-proxy', + labelNames: ['state'] +}) + +let app = express() + +app.get('/metrics', (req, res) => { + res.end(client.register.metrics()) +}) + +if ( PROMETHEUS_EXPORTER ) { + logger.info('Prometheus enabled on port: ' + PROMETHEUS_PORT) + app.listen(PROMETHEUS_PORT || 5000) +} + +module.exports = { + messageCount, + publishCount, + receiveCount, + requestSummary +} diff --git a/src/lib/pubsub.js b/src/lib/pubsub.js new file mode 100644 index 0000000..e4e6b7b --- /dev/null +++ b/src/lib/pubsub.js @@ -0,0 +1,69 @@ +'use strict' + +let pubSubManager = require('@superbalist/js-pubsub-manager') +let PubSubManager = pubSubManager.PubSubManager +let ConnectionFactory = pubSubManager.PubSubConnectionFactory +let prom = require('./prometheus') +// create pubsub manager +let connectionFactory = new ConnectionFactory() +let manager = new PubSubManager(connectionFactory, pubSubManager.config) +let connection = manager.connection() +let config = require('../config') +let validator = require('./validator') + +let logger = require('./logger') + + +let publish = async(channel, messages) => { + let errors = [] + let validMessages = [] + let invalidMessages = [] + // Map all messages to validate and publish each individually. + await Promise.all(messages.map((message)=>{ + return validator.validate(message).then(()=>{ + validMessages.push(message) + // If a message is valid then publish it. + prom.messageCount.inc({state: 'valid'}) + return connection.publish(channel, message) + }).catch((error) => { + if (error.name == 'ValidationError') { + logger.warn(`ValidationError: ${channel} - ${error.event.errors}`) + invalidMessages.push(error.event) + // If it is not valid then publish the invalid event to a separate channel + // For now we're going to dual publish the invalid messages. + // Wrap it in a try catch so that if it fails but publish succeeds it doesn't + // publish the same event twice. + prom.messageCount.inc({state: 'invalid'}) + try { + let failPublish = connection.publish(config.VALIDATION_ERROR_CHANNEL, error.event) + if (!config.PUBLISH_INVALID) { + return failPublish + } + } catch (error) { + // Do nothing unless we're only publishing the error + if (!config.PUBLISH_INVALID) { + throw new Error(error) + } + } + return connection.publish(channel, message) + } else { + // Throw an exception if it cannot publish or run validation. + throw new Error(error) + } + }).catch((error) => { + // This could be errors in validation or publishing + // Add messages with errors to the errors array. + errors.push({error: error.message, message}) + logger.error(error) + return error + }) + })) + // Return an object with all messages. errors may contain valid and invalid + // messages that were unable to be published. + return {validMessages, invalidMessages, errors} +} + +module.exports = { + publish, + connection +} diff --git a/src/lib/queue.js b/src/lib/queue.js new file mode 100644 index 0000000..c46f3c4 --- /dev/null +++ b/src/lib/queue.js @@ -0,0 +1,66 @@ +const logger = require('./logger') +const prom = require('./prometheus') +const pubsub = require('./pubsub') +const queueFactory = require('queue') +const rabbitController = require('./rabbit') +const { + FALLBACK, + QUEUE_CONCURRENCY, + QUEUE_RE_ADD_JOB_TIMEOUT, + QUEUE_RESTART_TIME +} = require('../config') + +// Configure a new queue +const q = queueFactory({ concurrency: QUEUE_CONCURRENCY }) + +const createPublishJob = (channel, messages, end, retries = 0) => (() => pubsub.publish(channel, messages) + .then((result) => { + let errors = result.errors + if (errors.length > 0) { + logger.error(errors) + prom.publishCount.inc({ state: 'failed', channel }) + if (FALLBACK && retries >= 2) { + end() + return rabbitController.publish(channel, errors) + } + q.push(createPublishJob(channel, errors.map((error) => error.message), end, retries++)) + } else { + prom.publishCount.inc({ state: 'success', channel }) + end() + } + }) + .catch((error) => { + //This means the call failed alltogether, automatically goes back on queue + logger.error(error.message) + setTimeout(() => q.push(createPublishJob(channel, messages, end)), QUEUE_RE_ADD_JOB_TIMEOUT) + }) +) + + +const queue = { + addPublishJob(channel, messages, end, retries = 0) { + q.push(createPublishJob(channel, messages, end, retries)) + }, + autoRestart(interval) { + q._restart_timer = setInterval(() => q.running || q.start(), interval) + }, + end() { + return new Promise((resolve)=>{ + q.on('success', ()=>{ + logger.info(`${q.length} jobs remaining on the queue.`) + }) + + // when the queue is cleared, delete restart_timer and resolve a promise + q.on('end', ()=>{ + logger.info(`Queue empty.`) + q._restart_timer && clearInterval(q._restart_timer) + logger.info(`Queue restart timer stopped. Queue is stopped.`) + resolve() + }) + }) + } +} + +queue.autoRestart(QUEUE_RESTART_TIME) + +module.exports = queue diff --git a/src/lib/rabbit.js b/src/lib/rabbit.js new file mode 100644 index 0000000..f6fcd97 --- /dev/null +++ b/src/lib/rabbit.js @@ -0,0 +1,67 @@ +#!/usr/bin/env node +const amqp = require('amqplib') +const logger = require('./logger') +const {RABBIT} = require('../config') + +let open + +const rabbitController = { + /** + * Start the rabbit connection + */ + start() { + if (open) return false // don't start it if it has already been started + + open = amqp.connect({ + username: RABBIT.USER, + password: Buffer.from(RABBIT.PASSWORD, 'base64').toString(), + hostname: RABBIT.HOST + }).then((connection)=>{ + logger.info(`Connected to RabbitMQ at ${RABBIT.HOST}`) + return connection + }).catch((err)=>{ + logger.error(`Could not start Rabbit at ${RABBIT.HOST}: ${err.message}`) + }) + }, + + /** + * Close the rabbit connection + * A short delay allows rabbit to finish what it is doing before closing the connection. + * + * @param {Number} waitTime Time to wait before closing rabbit connection + */ + async stop(waitTime = 100) { + if (!open) return Promise.resolve('Rabbit not started') // only stop if it has been started, but don't error out + + const connection = await open //already caught in start + if (!connection) return Promise.resolve('Could not fetch rabbit connection') // only stop if we have a connection, but don't error out if we dont + logger.info(`Waiting ${waitTime}ms for rabbit to finish..`) + setTimeout(()=>{ + connection.close() + logger.info(`..Rabbit connection closed.`) + return Promise.resolve() + }, waitTime) + }, + + /** + * Publish a list of errors to a channel + * @param {String} channel + * @param {Array[Object]} errors an array of errors + */ + async publish(channel, errors) { + if (!open) return Promise.reject() // bail early if it has not been started + return open.then(async(connection) => { + let amqpchannel = await connection.createChannel().catch((error)=>{ + logger.error(error.message) + }) + return amqpchannel.sendToQueue( + 'failed_pubsub_publish', + Buffer.from(JSON.stringify({channel, errors})) + ) + }) + } +} + + +module.exports = rabbitController + diff --git a/src/lib/validator.js b/src/lib/validator.js new file mode 100644 index 0000000..b2a6135 --- /dev/null +++ b/src/lib/validator.js @@ -0,0 +1,82 @@ +const config = require('../config') +const logger = require('./logger') +// By default validation is always happy +let validate = () => { + return Promise.resolve() +} +// If a schema url is supplied then validation will occur +if (config.VALIDATION_ERROR_SCHEMA_URL) { + let uris = {} + let eventPubSub = require('@superbalist/js-event-pubsub') + let request = require('request-promise-native') + let JSONSchemaEventValidator = eventPubSub.validators.JSONSchemaEventValidator + + let Ajv = require('ajv') + let ajv = new Ajv({ + extendRefs: true, + loadSchema: async (uri) => { + // Use previously requested successful URIs + if (!uris[uri] || await uris[uri].catch(()=>{ + return 'Failed' + }) === 'Failed') { + // In memory cache + uris[uri] = request({uri: uri, json: true}) + } + return uris[uri] + }, + allErrors: true + }) + ajv.addMetaSchema(require('ajv/lib/refs/json-schema-draft-04.json')) + + let validator = new JSONSchemaEventValidator(ajv) + let SchemaEvent = eventPubSub.events.SchemaEvent + /** + * ValidationError Class + */ + class ValidationError extends Error { + /** + * Construct a ValidationError + * + * @param {ValidationResult} validationResult + */ + constructor(validationResult, ...params) { + // Pass remaining arguments (including vendor specific ones) to parent constructor + super(...params) + this.name = 'ValidationError' + // Custom debugging information + this.event = { + schema: config.VALIDATION_ERROR_SCHEMA_URL, + meta: validationResult.event.attributes.meta, + event: validationResult.event.schema, + errors: validationResult.errors + } + } + } + + // Override validation with ajv validation. + validate = (message) => { + if (!message.schema) { + logger.error(`No schema: ${JSON.stringify(message.meta)}`) + return Promise.reject(new ValidationError({ + event: { + attributes: {meta: message.meta} + }, + errors: ['No schema Provided'] + })) + } + return validator.validate(new SchemaEvent(message.schema, message)) + .then((validationResult)=>{ + if (validationResult.passes) { + return message + } else { + // Throw an error for validation so that it can be caught higher up. + logger.error(validationResult) + throw new ValidationError(validationResult) + } + }) + } +} + +module.exports = { + validate +} diff --git a/src/listen.js b/src/listen.js deleted file mode 100644 index 796f2e4..0000000 --- a/src/listen.js +++ /dev/null @@ -1,15 +0,0 @@ -'use strict'; - -let pubsub = require('./pubsub'); -let connection = pubsub.connection(); - -let channel = process.argv[2] || 'test'; - -let count = 0; - -console.log(`Listening for all messages on channel "${channel}"`); -connection.subscribe(channel, (message) => { - console.log(count++); - console.log(message); - console.log(typeof(message)); -}); diff --git a/src/logger.js b/src/logger.js deleted file mode 100644 index 029344c..0000000 --- a/src/logger.js +++ /dev/null @@ -1,13 +0,0 @@ -'use strict'; - -let winston = require('winston'); -let config = require('./config'); - -let logger = new (winston.Logger)({ - transports: [ - new (winston.transports.Console)({'timestamp': true}), - ], -}); -logger.level = config.LOG_LEVEL; - -module.exports = logger; diff --git a/src/prometheus.js b/src/prometheus.js deleted file mode 100644 index 6fe2954..0000000 --- a/src/prometheus.js +++ /dev/null @@ -1,48 +0,0 @@ -'use strict'; -let express = require('express'); -let client = require('prom-client'); -let config = require('./config'); -client.collectDefaultMetrics({timeout: 30000}); - -const receiveCount = new client.Counter({ - name: 'pubsub_rest_proxy_receive_total', - help: 'Count of all messages recieved by pubsub-rest-proxy', - labelNames: ['channel'], -}); - -const publishCount = new client.Counter({ - name: 'pubsub_rest_proxy_publish_total', - help: 'Count of all messages published by pubsub-rest-proxy', - labelNames: ['state', 'channel'], -}); - -const requestSummary = new client.Summary({ - name: 'pubsub_rest_proxy_receive_to_publish_percentile', - help: 'Percentiles of time between receive and publish', - maxAgeSeconds: 600, - ageBuckets: 5, -}); - -const messageCount = new client.Counter({ - name: 'pubsub_rest_proxy_message_total', - help: 'Count of all individual messages published by pubsub-rest-proxy', - labelNames: ['state'], -}); - -let app = express(); - -app.get('/metrics', (req, res) => { - res.end(client.register.metrics()); -}); - -if( config.PROMETHEUS_EXPORTER ) { - console.log('Prometheus enabled on port: ' + config.PROMETHEUS_PORT); - app.listen(config.PROMETHEUS_PORT || 5000); -} - -module.exports = { - messageCount, - publishCount, - receiveCount, - requestSummary, -}; diff --git a/src/pubsub.js b/src/pubsub.js deleted file mode 100644 index 3e95a73..0000000 --- a/src/pubsub.js +++ /dev/null @@ -1,67 +0,0 @@ -'use strict'; - -let pubSubManager = require('@superbalist/js-pubsub-manager'); -let PubSubManager = pubSubManager.PubSubManager; -let ConnectionFactory = pubSubManager.PubSubConnectionFactory; -let prom = require('./prometheus'); -// create pubsub manager -let connectionFactory = new ConnectionFactory(); -let manager = new PubSubManager(connectionFactory, pubSubManager.config); -let connection = manager.connection(); -let config = require('./config'); -let validator = require('./validator'); - -let logger = require('./logger'); - -let publish = async (channel, messages) => { - let errors = []; - let validMessages = []; - let invalidMessages = []; - // Map all messages to validate and publish each individually. - await Promise.all(messages.map((message)=>{ - return validator.validate(message).then(()=>{ - validMessages.push(message); - // If a message is valid then publish it. - prom.messageCount.inc({state: 'valid'}) - return connection.publish(channel, message); - }).catch((error) => { - if(error.name == 'ValidationError') { - logger.warn(`ValidationError: ${channel} - ${error.event.errors}`); - invalidMessages.push(error.event); - // If it is not valid then publish the invalid event to a separate channel - // For now we're going to dual publish the invalid messages. - // Wrap it in a try catch so that if it fails but publish succeeds it doesn't - // publish the same event twice. - prom.messageCount.inc({state: 'invalid'}); - try{ - let failPublish = connection.publish(config.VALIDATION_ERROR_CHANNEL, error.event); - if(!config.PUBLISH_INVALID) { - return failPublish; - } - } catch(error) { - // Do nothing unless we're only publishing the error - if(!config.PUBLISH_INVALID) { - throw new Error(error); - } - } - return connection.publish(channel, message); - } else { - // Throw an exception if it cannot publish or run validation. - throw new Error(error); - } - }).catch((error) => { - // This could be errors in validation or publishing - // Add messages with errors to the errors array. - errors.push({error: error.message, message}); - logger.error(error); - return error; - }); - })); - // Return an object with all messages. errors may contain valid and invalid - // messages that were unable to be published. - return {validMessages, invalidMessages, errors}; -}; - -module.exports = { - publish, -}; diff --git a/src/queue.js b/src/queue.js deleted file mode 100644 index eaf1908..0000000 --- a/src/queue.js +++ /dev/null @@ -1,18 +0,0 @@ -let q = require('queue'); -let logger = require('./logger'); - -let queue = q(); - -// handle queue events -queue.on('error', function(err, job) { - logger.error(err); - setTimeout(() => { - queue.push(job); - }, 2000); -}); - -queue.timer = setInterval(() => { - queue.start(); -}, 300); - -module.exports = queue; diff --git a/src/rabbit.js b/src/rabbit.js deleted file mode 100644 index 16a3b12..0000000 --- a/src/rabbit.js +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env node -let amqp = require('amqplib'); -let config = require('./config'); - -let connection = amqp.connect({ - username: config.RABBIT.USER, - password: Buffer.from(config.RABBIT.PASSWORD, 'base64').toString(), - hostname: config.RABBIT.HOST, -}).catch((err)=>{ - console.log(err); -}); - -let publish = (channel, errors) => { - return connection.then(async (conn)=>{ - let amqpchannel = await conn.createChannel(); - return amqpchannel.sendToQueue( - 'failed_pubsub_publish', - Buffer.from(JSON.stringify({channel, errors})) - ); - }); -}; - -process.on('SIGTERM', () => { - setTimeout(()=>{ - connection.then((conn)=>{ - conn.close(); - }); - }, 5000); -}); - -process.on('SIGINT', () => { - setTimeout(()=>{ - connection.then((conn)=>{ - conn.close(); - }); - }, 5000); -}); - - -module.exports = { - publish, -}; - - diff --git a/src/validator.js b/src/validator.js deleted file mode 100644 index 9ea4918..0000000 --- a/src/validator.js +++ /dev/null @@ -1,81 +0,0 @@ -const config = require('./config'); -const logger = require('./logger'); -// By default validation is always happy -let validate = () => { - return Promise.resolve(); -}; -// If a schema url is supplied then validation will occur -if(config.VALIDATION_ERROR_SCHEMA_URL) { - let uris = {}; - let eventPubSub = require('@superbalist/js-event-pubsub'); - let request = require('request-promise-native'); - let JSONSchemaEventValidator = eventPubSub.validators.JSONSchemaEventValidator; - - let Ajv = require('ajv'); - let ajv = new Ajv({ - extendRefs: true, - loadSchema: async (uri) => { - // Use previously requested successful URIs - if (!uris[uri] || await uris[uri].catch(()=>{ - return 'Failed'; - }) == 'Failed') { - // In memory cache - uris[uri] = request({uri: uri, json: true}); - } - return uris[uri]; - }, - allErrors: true, - }); - ajv.addMetaSchema(require('ajv/lib/refs/json-schema-draft-04.json')); - - let validator = new JSONSchemaEventValidator(ajv); - let SchemaEvent = eventPubSub.events.SchemaEvent; - /** - * ValidationError Class - */ - class ValidationError extends Error { - /** - * Construct a ValidationError - * - * @param {ValidationResult} validationResult - */ - constructor(validationResult, ...params) { - // Pass remaining arguments (including vendor specific ones) to parent constructor - super(...params); - this.name = 'ValidationError'; - // Custom debugging information - this.event = { - 'schema': config.VALIDATION_ERROR_SCHEMA_URL, - 'meta': validationResult.event.attributes.meta, - 'event': validationResult.event.schema, - 'errors': validationResult.errors, - }; - } - } - - // Override validation with ajv validation. - validate = (message) => { - if(!message.schema) { - logger.error(`No schema: ${JSON.stringify(message.meta)}`); - return Promise.reject(new ValidationError({ - event: { - attributes: {meta: message.meta}, - }, - errors: ['No schema Provided'], - })); - } - return validator.validate(new SchemaEvent(message.schema, message)) - .then((validationResult)=>{ - if(validationResult.passes) { - return message; - } else { - // Throw an error for validation so that it can be caught higher up. - throw new ValidationError(validationResult); - } - }); - }; -} - -module.exports = { - validate, -};