Skip to content

Commit

Permalink
PLAT-1037 leaky bucket (#12)
Browse files Browse the repository at this point in the history
Some refactor
Packages updated and version-pinned
More aggressive management of RabbitFallback
  • Loading branch information
mholtzhausen authored Oct 19, 2020
1 parent 7b5a82a commit 52bb3fb
Show file tree
Hide file tree
Showing 25 changed files with 583 additions and 512 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ root = true

[*]
indent_style = space
indent_size = 2
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
Expand Down
20 changes: 0 additions & 20 deletions .eslintrc

This file was deleted.

36 changes: 36 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"root": true,
"env": {
"es6": true,
"node": true
},
"extends": [
"eslint:recommended"
],
"parserOptions": {
"ecmaVersion": 8
},
"rules": {
"quote-props": ["warn", "consistent-as-needed"],
"arrow-parens": 0,
"generator-star-spacing": 0,
"no-debugger": 0,
"array-element-newline": ["error", "consistent"],
"array-bracket-newline": ["error", "consistent"],
"array-bracket-spacing": ["error", "never"],
"no-console": "error",
"indent": ["error", 4, { "SwitchCase": 1 }],
"space-infix-ops": ["warn"],
"eol-last":["warn"],
"no-multi-spaces": ["warn", { "ignoreEOLComments": true }],
"no-multiple-empty-lines": ["error", { "max": 2, "maxEOF": 1 }],
"comma-dangle": ["warn", "never"],
"eqeqeq": ["warn"],
"semi": ["warn", "never"],
"keyword-spacing": ["warn"],
"space-before-blocks": ["warn"],
"comma-spacing": ["warn", { "before": false, "after": true }]
}
}


4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ typings/
# dotenv environment variables file
.env

# vscode editor settings
.vscode/


# End of https://www.gitignore.io/api/node

.idea
lib
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM node:7.9.0
FROM node:12.18.3
MAINTAINER Superbalist <tech+docker@superbalist.com>

RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

COPY package.json /usr/src/app/
RUN yarn install
RUN npm install --only=prod

COPY src /usr/src/app/src/

Expand Down
4 changes: 2 additions & 2 deletions Dockerfile-alpine
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM node:7.9.0-alpine
FROM node:12.18.3-alpine
MAINTAINER Superbalist <tech+docker@superbalist.com>

RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

COPY package.json /usr/src/app/
RUN yarn install
RUN npm install --only=prod

COPY src /usr/src/app/src/

Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.3.0 2020-09-15
* Badly formed channel messages result in `400` error
* When this service is shutting down, it won't be receiving any more messages
* When a message fails with `503`, the caller should retry

## 1.2.0 2018-12-21

* Fallback to rabbitmq if messages fail 3 times.
Expand Down
27 changes: 13 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"name": "@superbalist/js-pubsub-rest-proxy",
"version": "1.2.0",
"version": "1.3.0",
"description": "An HTTP server which acts as a gateway for publishing messages via a js-pubsub adapter",
"private": true,
"scripts": {
"start": "node ./src/bin/www",
"start-dev": "nodemon -L ./src/bin/www"
"start-dev": "nodemon -L ./src/bin/www -- --trace-sync-io"
},
"repository": {
"type": "git",
Expand All @@ -24,20 +24,19 @@
"dependencies": {
"@superbalist/js-event-pubsub": "^3.0.2",
"@superbalist/js-pubsub-manager": "^3.0.1",
"ajv": "^5.0.1",
"amqplib": "^0.5.3",
"body-parser": "^1.17.2",
"debug": "~2.6.3",
"express": "^4.15.3",
"morgan": "^1.8.1",
"prom-client": "^10.2.2",
"queue": "^4.2.1",
"ajv": "5.0.1",
"amqplib": "0.6.0",
"body-parser": "1.19.0",
"debug": "4.1.1",
"express": "4.17.1",
"morgan": "1.10.0",
"prom-client": "12.0.0",
"queue": "6.0.1",
"raven": "^2.0.1",
"winston": "^2.3.1"
"winston": "3.3.3"
},
"devDependencies": {
"eslint": "^6.8.0",
"eslint-config-google": "^0.7.1",
"nodemon": "^1.18.7"
"eslint": "7.9.0",
"nodemon": "2.0.4"
}
}
222 changes: 98 additions & 124 deletions src/app.js
Original file line number Diff line number Diff line change
@@ -1,133 +1,107 @@
'use strict';

let config = require('./config');
let express = require('express');
let morgan = require('morgan');
let bodyParser = require('body-parser');
let Raven = require('raven');
if (config.SENTRY_DSN) {
Raven.config(config.SENTRY_DSN)
.install();
'use strict'

// get configuration
const {
SENTRY_DSN,
MAX_POST_SIZE,
FALLBACK,
RABBIT
} = require('./config')

let serviceIsAvailable = false
const express = require('express')
const morgan = require('morgan')
const bodyParser = require('body-parser')
const Raven = require('raven')

//TODO Upgrade to sentry pls -- this is deprecated
if (SENTRY_DSN) {
Raven.config(SENTRY_DSN)
.install()
}

if (config.FALLBACK) {
// Use var intentionally so that rabbit doesn't need to be defined
var rabbit = require('./rabbit'); // eslint-disable-line no-var
}
const prom = require('./lib/prometheus')
const logger = require('./lib/logger')
const queue = require('./lib/queue')
const ServiceError = require('./lib/ServiceError')
const rabbitController = require('./lib/rabbit')

let prom = require('./prometheus');

let logger = require('./logger');
let pubsub = require('./pubsub');
let queue = require('./queue');

// bootstrap app
let app = express();
app.use(morgan('dev'));
app.use(bodyParser.json({ limit: config.MAX_POST_SIZE }));
app.use(bodyParser.urlencoded({extended: false}));

// register routes
app.get('/', (req, res, next) => {
res.json({ping: 'pong'});
});

app.get('/healthz', (req, res, next) => {
res.json({ping: 'pong'});
});

app.post('/messages/:channel', (req, res, next) => {
let end = prom.requestSummary.startTimer();
let channel = req.params.channel;
prom.receiveCount.inc({channel});
let messages = req.body.messages || [];
queue.push(publishJob(channel, messages, end));
res.json({success: true});
});


/**
* Return a job
*
* @param {String} channel
* @param {[Object]} messages
* @param {Function} end
* @param {Integer} retries
*
* @return {Function}
*/
function publishJob(channel, messages, end, retries=0) {
return function(cb) {
pubsub.publish(channel, messages).then((result)=>{
let errors = result.errors;
if(errors.length > 0) {
prom.publishCount.inc({state: 'failed', channel});
if(config.FALLBACK && retries >= 2) {
end();
return rabbit.publish(channel, errors);
}
retries++;
queue.push(publishJob(channel, errors.map((error) => error.message), end, retries));
} else {
prom.publishCount.inc({state: 'success', channel});
end();
}
cb();
}).catch((error) => {
cb(error);
});
};
}
if (FALLBACK) rabbitController.start()

const app = express()

app.use(morgan('dev'))
app.use(bodyParser.json({ limit: MAX_POST_SIZE }))
app.use(bodyParser.urlencoded({ extended: false }))

// why is this here? :shrug:
app.get('/', async (req, res) => {
// logger.info('ping')
res.json({ ping: 'pong' }).end()
})

// k8s health check endpoint
app.get('/healthz', (req, res) => {
if (!serviceIsAvailable) throw new ServiceError('This service is not currently accepting requests', 400)
res.json({ ping: 'pong' })
} )

// bind middleware
app.use((req, res, next) => {
let err = new Error('Not Found');
err.status = 404;
next(err);
});
// handle incoming message post requests
app.post('/messages/:channel', async (req, res) => {
const end = prom.requestSummary.startTimer()
const channel = req.params.channel
const messages = req.body.messages

// Early exit if the format is wrong
if (!Array.isArray(messages)) throw new ServiceError('`messages` property is expected to be an array', 400)

// Count this channel add
prom.receiveCount.inc({ channel })
queue.addPublishJob(channel, messages, end)
res.json({ success: true }).end()
})


// Anything requests that have not been dealt with by this point is 404
app.use((req, res, next) => next(new ServiceError('Not Found', 404)))

// Express Error Handler
// eslint-disable-next-line no-unused-vars
app.use((err, req, res, next) => {
// set locals, only providing error in development
res.locals.message = err.message;
res.locals.error = req.app.get('env') === 'development' ? err : {};

// return a json error response
let status = err.status || 500;
res.status(status);
res.json({
status: status,
message: err.message,
});
});

let onExitHandler = () => {
logger.info('Preparing to shutdown application');

logger.info('Stopping queue timer');
clearInterval(queue.timer);

logger.info('Closing express server socket');
// When the HTTP server closes we want to empty the job queue.
app.server.close(emptyQueue);
};


/**
* Run the queue and exit if it is empty
*/
function emptyQueue() {
if(queue.length > 0) {
logger.info('Processing remaining jobs on queue');
queue.start(emptyQueue);
}
// set locals, only providing error in development
res.locals.message = err.message
res.locals.error = req.app.get('env') === 'development' ? err : {}

// return a json error response
let status = err.status || 500
res.status(status)
res.json({
status,
message: err.message
})
})

// Graceful shutdown
let exitHandler = async () => {
serviceIsAvailable = false // Health endpoint will start reporting failure

logger.info('Shutdown Initiated.')

logger.info('Express server has shut down.')

Promise.all([
queue.end(), //Empty queue
rabbitController.stop(RABBIT.SHUTDOWN_WAIT) //Wait for rabbit connection to close
]).then(()=>{
logger.info('PSRP Terminating.')

}).catch(()=>{})
}

process.on('SIGTERM', () => {
onExitHandler();
});

process.on('SIGINT', () => {
onExitHandler();
});
// Make sure we exit cleanly
process.on('SIGTERM', exitHandler) // regular termination signal
process.on('SIGINT', exitHandler) // for ^C
// process.on('SIGUSR2', exitHandler) // for nodemon during dev

module.exports = app;
serviceIsAvailable = true // Health endpoint will start reporting success
module.exports = app
Loading

0 comments on commit 52bb3fb

Please sign in to comment.