diff --git a/red/comms.js b/red/comms.js index f16ba6f5a..be23729a6 100644 --- a/red/comms.js +++ b/red/comms.js @@ -50,7 +50,13 @@ function start() { } }); ws.on('message', function(data,flags) { - var msg = JSON.parse(data); + var msg = null; + try { + msg = JSON.parse(data); + } catch(err) { + util.log("[red:comms] received malformed message : "+err.toString()); + return; + } if (msg.subscribe) { handleRemoteSubscription(ws,msg.subscribe); } diff --git a/test/red/comms_spec.js b/test/red/comms_spec.js index 071beceeb..f83a7839a 100644 --- a/test/red/comms_spec.js +++ b/test/red/comms_spec.js @@ -13,10 +13,114 @@ * See the License for the specific language governing permissions and * limitations under the License. **/ -var should = require("should"); -describe("red/comms", function() { - it('can be required without errors', function() { - require("../../red/comms"); +var should = require("should"); +var http = require('http'); +var express = require('express'); +var app = express(); +var WebSocket = require('ws'); + +var comms = require("../../red/comms.js"); +var address = '127.0.0.1'; +var listenPort = 0; // use ephemeral port + +describe("comms", function() { + var server; + var url; + var port; + before(function(done) { + server = http.createServer(function(req,res){app(req,res)}); + comms.init(server, {}); + server.listen(listenPort, address); + server.on('listening', function() { + port = server.address().port; + url = 'http://' + address + ':' + port + '/comms'; + comms.start(); + done(); + }); }); + + it('accepts connection', function(done) { + var ws = new WebSocket(url); + ws.on('open', function() { + ws.close(); + done(); + }); + }); + + it('publishes message after subscription', function(done) { + var ws = new WebSocket(url); + ws.on('open', function() { + ws.send('{"subscribe":"topic1"}'); + comms.publish('topic1', 'foo'); + }); + ws.on('message', function(msg) { + msg.should.equal('{"topic":"topic1","data":"foo"}'); + ws.close(); + done(); + }); + }); + + it('publishes retained message for subscription', function(done) { + comms.publish('topic2', 'bar', true); + var ws = new WebSocket(url); + ws.on('open', function() { + ws.send('{"subscribe":"topic2"}'); + }); + ws.on('message', function(msg) { + msg.should.equal('{"topic":"topic2","data":"bar"}'); + ws.close(); + done(); + }); + }); + + it('retained message is deleted by non-retained message', function(done) { + comms.publish('topic3', 'retained', true); + comms.publish('topic3', 'non-retained'); + var ws = new WebSocket(url); + ws.on('open', function() { + ws.send('{"subscribe":"topic3"}'); + comms.publish('topic3', 'new'); + }); + ws.on('message', function(msg) { + msg.should.equal('{"topic":"topic3","data":"new"}'); + ws.close(); + done(); + }); + }); + + it('malformed messages are ignored',function(done) { + var ws = new WebSocket(url); + ws.on('open', function() { + ws.send('not json'); + ws.send('[]'); + ws.send('{"subscribe":"topic3"}'); + comms.publish('topic3', 'correct'); + }); + ws.on('message', function(msg) { + msg.should.equal('{"topic":"topic3","data":"correct"}'); + ws.close(); + done(); + }); + }); + + // The following test currently fails due to minimum viable + // implementation. More test should be written to test topic + // matching once this one is passing + + if (0) { + it('receives message on correct topic', function(done) { + var ws = new WebSocket(url); + ws.on('open', function() { + ws.send('{"subscribe":"topic4"}'); + comms.publish('topic5', 'foo'); + comms.publish('topic4', 'bar'); + }); + ws.on('message', function(msg) { + msg.should.equal('{"topic":"topic4","data":"bar"}'); + ws.close(); + done(); + }); + }); + } });