Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow receive command notifications from CB #663

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- allow receive command notifications from CB (iotagent-node-lib#1455
61 changes: 52 additions & 9 deletions lib/bindings/HTTPBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ function handleIncomingMeasure(req, res, next) {
* @param {Object} attribute Attribute in NGSI format.
* @return {Function} Command execution function ready to be called with async.series.
*/
function generateCommandExecution(apiKey, device, attribute) {
function generateCommandExecution(apiKey, device, group, attribute) {
const cmdName = attribute.name;
const cmdAttributes = attribute.value;
context = fillService(context, device);
Expand Down Expand Up @@ -362,7 +362,7 @@ function generateCommandExecution(apiKey, device, attribute) {
function commandHandler(device, attributes, callback) {
context = fillService(context, device);
utils.getEffectiveApiKey(device.service, device.subservice, device, function (error, apiKey) {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), function (error) {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device, null)), function (error) {
if (error) {
// prettier-ignore
config.getLogger().error(context,
Expand Down Expand Up @@ -526,13 +526,15 @@ function stop(callback) {
}
}

function sendPushNotifications(device, values, callback) {
async.series(values.map(generateCommandExecution.bind(null, null, device)), function (error) {
function sendPushNotifications(device, group, values, callback) {
const executions = _.flatten(values.map(generateCommandExecution.bind(null, group.apikey, device, group)));

async.series(executions, function (error) {
callback(error);
});
}

function storePollNotifications(device, values, callback) {
function storePollNotifications(device, group, values, callback) {
function addPollNotification(item, innerCallback) {
iotAgentLib.addCommand(device.service, device.subservice, device.id, item, innerCallback);
}
Expand All @@ -541,11 +543,52 @@ function storePollNotifications(device, values, callback) {
}

function notificationHandler(device, values, callback) {
if (device.endpoint) {
sendPushNotifications(device, values, callback);
} else {
storePollNotifications(device, values, callback);
config.getLogger().debug(context, 'values for command %j and device %j', values, device);

function invokeWithConfiguration(apiKey, callback) {
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
var cmdValue = { type: 'command' };
for (let val of values) {
if (val.name === 'cmd') {
cmdValue.name = val.value;
} else if (val.name === 'params') {
cmdValue.value = val.value;
} else {
// other fields like status, info, onDelivered, OnError
cmdValue[val.name] = val.value;
}
}
var cmdValues = [cmdValue];
config.getLogger().debug(context, 'cmdValues %j', cmdValues);
iotAgentLib.executeUpdateSideEffects(
device,
device.id,
device.type,
device.service,
device.subservice,
cmdValues,
function () {
if (device.endpoint || group.endpoint) {
sendPushNotifications(device, group, cmdValues, callback);
} else {
storePollNotifications(device, group, cmdValues, callback);
}
}
);
});
}

async.waterfall(
[apply(utils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
callback
);
}

exports.start = start;
Expand Down
26 changes: 22 additions & 4 deletions lib/iotagent-ul.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,28 @@ function deviceUpdatingHandler(device, callback) {
* @param {Array} values Values recieved in the notification.
*/
function notificationHandler(device, values, callback) {
transportSelector.applyFunctionFromBinding(
[device, values],
'notificationHandler',
device.transport || config.getConfig().defaultTransport,
function invokeWithConfiguration(apiKey, callback) {
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
transportSelector.applyFunctionFromBinding(
[device, values],
'notificationHandler',
device.transport ||
(group && group.transport ? group.transport : undefined) ||
config.getConfig().defaultTransport,
callback
);
});
}

async.waterfall(
[apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
callback
);
}
Expand Down
135 changes: 135 additions & 0 deletions test/unit/ngsiv2/HTTP_commands_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,44 @@ describe('HTTP: Commands', function () {
});
});
});

describe('When a command arrive to the Agent for a device with the HTTP protocol throught a CB notification', function () {
const commandOptions = {
url: 'http://localhost:' + config.iota.server.port + '/notify',
method: 'POST',
json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/notifyCommand.json'),
headers: {
'fiware-service': 'smartgondor',
'fiware-servicepath': '/gardens'
}
};

beforeEach(function () {
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus1.json')
)
.reply(204);

mockedClientServer = nock('http://localhost:9876')
.post('/command', function (body) {
return body === 'MQTT_2@PING|data=22';
})
.reply(200, 'MQTT_2@PING|data=22');
});

it('should return a 200 OK without errors', function (done) {
request(commandOptions, function (error, response, body) {
should.not.exist(error);
response.statusCode.should.equal(200);
contextBrokerMock.done();
done();
});
});
});
});

describe('HTTP: Commands with expressions', function () {
Expand Down Expand Up @@ -435,3 +473,100 @@ describe('HTTP: Commands with expressions 2', function () {
});
});
});

describe('HTTP: Commands with expressions', function () {
beforeEach(function (done) {
const provisionOptions = {
url: 'http://localhost:' + config.iota.server.port + '/iot/devices',
method: 'POST',
json: utils.readExampleFile('./test/deviceProvisioning/provisionCommand7.json'),
headers: {
'fiware-service': 'smartgondor',
'fiware-servicepath': '/gardens'
}
};

config.logLevel = 'INFO';

nock.cleanAll();

contextBrokerMock = nock('http://192.168.1.1:1026')
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post('/v2/registrations')
.reply(201, null, { Location: '/v2/registrations/6319a7f5254b05844116584d' });

iotagentMqtt.start(config, function () {
request(provisionOptions, function (error, response, body) {
done();
});
});
});

afterEach(function (done) {
nock.cleanAll();
async.series([iotAgentLib.clearAll, iotagentMqtt.stop], done);
});

describe('When a command arrive to the Agent for a device with the HTTP protocol', function () {
const commandOptions = {
url: 'http://localhost:' + config.iota.server.port + '/v2/op/update',
method: 'POST',
json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateCommand1.json'),
headers: {
'fiware-service': 'smartgondor',
'fiware-servicepath': '/gardens'
}
};

beforeEach(function () {
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus1.json')
)
.reply(204);

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus6.json')
)
.reply(204);

mockedClientServer = nock('http://localhost:9876')
.post('/command', function (body) {
return body === 'MQTT_2@PING|MQTT_2AnMQTTDevice';
})
.reply(200, 'MQTT_2@PING|data=22');
});

it('should return a 204 OK without errors', function (done) {
request(commandOptions, function (error, response, body) {
should.not.exist(error);
response.statusCode.should.equal(204);
done();
});
});
it('should update the status in the Context Broker', function (done) {
request(commandOptions, function (error, response, body) {
setTimeout(function () {
contextBrokerMock.done();
done();
}, 100);
});
});
it('should publish the command information in the MQTT topic', function (done) {
request(commandOptions, function (error, response, body) {
setTimeout(function () {
mockedClientServer.done();
done();
}, 100);
});
});
});
});
64 changes: 64 additions & 0 deletions test/unit/ngsiv2/contextRequests/notifyCommand.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"subscriptionId": "60b0cedd497e8b681d40b58e",
"data": [{
"id": "123456abcdefg",
"type": "cmd1Execution",
"targetEntityId": {
"type": "Text",
"value": "Second MQTT Device",
"metadata": {}
},
"targetEntityType": {
"type": "Text",
"value": "AnMQTTDevice",
"metadata": {}
},
"execTs": {
"type": "DateTime",
"value": "2020-05-27T00:00:00.000Z",
"metadata": {}
},
"cmd": {
"type": "Text",
"value": "PING",
"metadata": {}
},
"params": {
"type": "Text",
"value": { "data": "22" },
"metadata": {}
},
"status": {
"type": "Text",
"value": "FORWARDED",
"metadata": {}
},
"info": {
"type": "Text",
"value": null,
"metadata": {}
},
"onDelivered": {
"type": "Request"
},
"onOk": {
"type": "Request"
},
"onError": {
"type": "Request"
},
"onInfo": {
"type": "Request"
},
"cmdExecution": {
"type": "value",
"value": true,
"metadata": {}
},
"dateExpiration": {
"type": "DateTime",
"value": "2030-05-27T20:00:00.000Z",
"metadata": {}
}
}]
}
Loading