From db1b0ccb79ee74d3e72ec3ea8fe16e11df77ca50 Mon Sep 17 00:00:00 2001 From: Christopher Hiller Date: Thu, 23 Aug 2018 00:50:51 -0700 Subject: [PATCH] fix lost messages / properties in TCPRequest Node; closes #1863 (#1864) - Added some more checks around this. - We're choosing to only use the latest message when sending, which is effectively what was happening before the queue implementation. --- nodes/core/io/31-tcpin.js | 13 +- test/nodes/core/io/31-tcprequest_spec.js | 163 ++++++++++++++++++++--- 2 files changed, 151 insertions(+), 25 deletions(-) diff --git a/nodes/core/io/31-tcpin.js b/nodes/core/io/31-tcpin.js index 63eb4a478..26593647a 100644 --- a/nodes/core/io/31-tcpin.js +++ b/nodes/core/io/31-tcpin.js @@ -467,6 +467,7 @@ module.exports = function(RED) { connecting: false }; enqueue(clients[connection_id].msgQueue, msg); + clients[connection_id].lastMsg = msg; if (!clients[connection_id].connecting && !clients[connection_id].connected) { var buf; @@ -507,8 +508,7 @@ module.exports = function(RED) { clients[connection_id].client.on('data', function(data) { if (node.out === "sit") { // if we are staying connected just send the buffer if (clients[connection_id]) { - let msg = dequeue(clients[connection_id].msgQueue) || {}; - clients[connection_id].msgQueue.unshift(msg); + const msg = clients[connection_id].lastMsg || {}; msg.payload = data; node.send(RED.util.cloneMessage(msg)); } @@ -530,8 +530,7 @@ module.exports = function(RED) { clients[connection_id].timeout = setTimeout(function () { if (clients[connection_id]) { clients[connection_id].timeout = null; - let msg = dequeue(clients[connection_id].msgQueue) || {}; - clients[connection_id].msgQueue.unshift(msg); + const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i+1); buf.copy(msg.payload,0,0,i+1); node.send(msg); @@ -553,8 +552,7 @@ module.exports = function(RED) { i += 1; if ( i >= node.splitc) { if (clients[connection_id]) { - let msg = dequeue(clients[connection_id].msgQueue) || {}; - clients[connection_id].msgQueue.unshift(msg); + const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); node.send(msg); @@ -573,8 +571,7 @@ module.exports = function(RED) { i += 1; if (data[j] == node.splitc) { if (clients[connection_id]) { - let msg = dequeue(clients[connection_id].msgQueue) || {}; - clients[connection_id].msgQueue.unshift(msg); + const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); node.send(msg); diff --git a/test/nodes/core/io/31-tcprequest_spec.js b/test/nodes/core/io/31-tcprequest_spec.js index 3cc0e1224..8aed61ef5 100644 --- a/test/nodes/core/io/31-tcprequest_spec.js +++ b/test/nodes/core/io/31-tcprequest_spec.js @@ -59,7 +59,11 @@ describe('TCP Request Node', function() { var n2 = helper.getNode("n2"); n2.on("input", function(msg) { try { - msg.should.have.property('payload', Buffer(val1)); + if (typeof val1 === 'object') { + msg.should.have.properties(Object.assign({}, val1, {payload: Buffer(val1.payload)})); + } else { + msg.should.have.property('payload', Buffer(val1)); + } done(); } catch(err) { done(err); @@ -79,7 +83,11 @@ describe('TCP Request Node', function() { const n2 = helper.getNode("n2"); n2.on("input", msg => { try { - msg.should.have.property('payload', Buffer(result)); + if (typeof result === 'object') { + msg.should.have.properties(Object.assign({}, result, {payload: Buffer(result.payload)})); + } else { + msg.should.have.property('payload', Buffer(result)); + } done(); } catch(err) { done(err); @@ -95,31 +103,75 @@ describe('TCP Request Node', function() { it('should send & recv data', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCP(flow, "foo", "ACK:foo", done) + testTCP(flow, { + payload: 'foo', + topic: 'bar' + }, { + payload: 'ACK:foo', + topic: 'bar' + }, done); + }); + + it('should retain complete message', function(done) { + var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, + {id:"n2", type:"helper"}]; + testTCP(flow, { + payload: 'foo', + topic: 'bar' + }, { + payload: 'ACK:foo', + topic: 'bar' + }, done); }); it('should send & recv data when specified character received', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"char", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCP(flow, "foo0bar0", "ACK:foo0", done); + testTCP(flow, { + payload: 'foo0bar0', + topic: 'bar' + }, { + payload: 'ACK:foo0', + topic: 'bar' + }, done); }); it('should send & recv data after fixed number of chars received', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"count", splitc: "7", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCP(flow, "foo bar", "ACK:foo", done); + testTCP(flow, { + payload: 'foo bar', + topic: 'bar' + }, { + payload: 'ACK:foo', + topic: 'bar' + }, done); }); it('should send & receive, then keep connection', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCP(flow, "foo", "ACK:foo", done); + testTCP(flow, { + payload: 'foo', + topic: 'bar' + }, { + payload: 'ACK:foo', + topic: 'bar' + }, done); }); it('should send & recv data to/from server:port from msg', function(done) { var flow = [{id:"n1", type:"tcp request", server:"", port:"", out:"time", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCP(flow, {payload:"foo", host:"localhost", port:port}, "ACK:foo", done) + testTCP(flow, { + payload: "foo", + host: "localhost", + port: port + }, { + payload: "ACK:foo", + host: 'localhost', + port: port + }, done); }); }); @@ -127,36 +179,95 @@ describe('TCP Request Node', function() { it('should send & recv data', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - - testTCPMany(flow, ['f', 'o', 'o'], 'ACK:foo', done); + testTCPMany(flow, [{ + payload: 'f', + topic: 'bar' + }, { + payload: 'o', + topic: 'bar' + }, { + payload: 'o', + topic: 'bar' + }], { + payload: 'ACK:foo', + topic: 'bar' + }, done); }); it('should send & recv data when specified character received', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"char", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCPMany(flow, ["foo0","bar0"], "ACK:foo0", done); + testTCPMany(flow, [{ + payload: "foo0", + topic: 'bar' + }, { + payload: "bar0", + topic: 'bar' + }], { + payload: "ACK:foo0", + topic: 'bar' + }, done); }); it('should send & recv data after fixed number of chars received', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"count", splitc: "7", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCPMany(flow, ["fo", "ob", "ar"], "ACK:foo", done); + testTCPMany(flow, [{ + payload: "fo", + topic: 'bar' + }, { + payload: "ob", + topic: 'bar' + }, { + payload: "ar", + topic: 'bar' + }], { + payload: "ACK:foo", + topic: 'bar' + }, done); }); - it('should send & receive, then keep connection', function(done) { var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCPMany(flow, ["foo", "bar", "baz"], "ACK:foobarbaz", done); + testTCPMany(flow, [{ + payload: "foo", + topic: 'bar' + }, { + payload: "bar", + topic: 'bar' + }, { + payload: "baz", + topic: 'bar' + }], { + payload: "ACK:foobarbaz", + topic: 'bar' + }, done); }); it('should send & recv data to/from server:port from msg', function(done) { var flow = [{id:"n1", type:"tcp request", server:"", port:"", out:"time", splitc: "0", wires:[["n2"]] }, {id:"n2", type:"helper"}]; - testTCPMany(flow, [ - {payload:"f", host:"localhost", port:port}, - {payload:"o", host:"localhost", port:port}, - {payload:"o", host:"localhost", port:port}], "ACK:foo", done); + testTCPMany(flow, [{ + payload: "f", + host: "localhost", + port: port + }, + { + payload: "o", + host: "localhost", + port: port + }, + { + payload: "o", + host: "localhost", + port: port + } + ], { + payload: "ACK:foo", + host: 'localhost', + port: port + }, done); }); it('should limit the queue size', function (done) { @@ -168,5 +279,23 @@ describe('TCP Request Node', function() { const expected = msgs.slice(0, -1); testTCPMany(flow, msgs, "ACK:" + expected.join(''), done); }); + + it('should only retain the latest message', function(done) { + var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, + {id:"n2", type:"helper"}]; + testTCPMany(flow, [{ + payload: 'f', + topic: 'bar' + }, { + payload: 'o', + topic: 'baz' + }, { + payload: 'o', + topic: 'quux' + }], { + payload: 'ACK:foo', + topic: 'quux' + }, done); + }); }); });