mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Merge branch 'issue/1414' of https://github.com/boneskull/node-red into boneskull-issue/1414
This commit is contained in:
		| @@ -18,10 +18,36 @@ module.exports = function(RED) { | ||||
|     "use strict"; | ||||
|     var reconnectTime = RED.settings.socketReconnectTime||10000; | ||||
|     var socketTimeout = RED.settings.socketTimeout||null; | ||||
|     const msgQueueSize = RED.settings.tcpMsgQueueSize || 1000; | ||||
|     const Denque = require('denque'); | ||||
|     var net = require('net'); | ||||
|  | ||||
|     var connectionPool = {}; | ||||
|  | ||||
|     /** | ||||
|      * Enqueue `item` in `queue` | ||||
|      * @param {Denque} queue - Queue | ||||
|      * @param {*} item - Item to enqueue | ||||
|      * @private | ||||
|      * @returns {Denque} `queue` | ||||
|      */ | ||||
|     const enqueue = (queue, item) => { | ||||
|         // drop msgs from front of queue if size is going to be exceeded | ||||
|         if (queue.size() === msgQueueSize) { | ||||
|             queue.shift(); | ||||
|         } | ||||
|         queue.push(item); | ||||
|         return queue; | ||||
|     }; | ||||
|  | ||||
|     /** | ||||
|      * Shifts item off front of queue | ||||
|      * @param {Deque} queue - Queue | ||||
|      * @private | ||||
|      * @returns {*} Item previously at front of queue | ||||
|      */ | ||||
|     const dequeue = queue => queue.shift(); | ||||
|  | ||||
|     function TcpIn(n) { | ||||
|         RED.nodes.createNode(this,n); | ||||
|         this.host = n.host; | ||||
| @@ -435,11 +461,14 @@ module.exports = function(RED) { | ||||
|             // the clients object will have: | ||||
|             // clients[id].client, clients[id].msg, clients[id].timeout | ||||
|             var connection_id = host + ":" + port; | ||||
|             clients[connection_id] = clients[connection_id] || {}; | ||||
|             clients[connection_id].msg = msg; | ||||
|             clients[connection_id].connected = clients[connection_id].connected || false; | ||||
|             clients[connection_id] = clients[connection_id] || { | ||||
|                 msgQueue: new Denque(), | ||||
|                 connected: false, | ||||
|                 connecting: false | ||||
|             }; | ||||
|             enqueue(clients[connection_id].msgQueue, msg); | ||||
|  | ||||
|             if (!clients[connection_id].connected) { | ||||
|             if (!clients[connection_id].connecting && !clients[connection_id].connected) { | ||||
|                 var buf; | ||||
|                 if (this.out == "count") { | ||||
|                     if (this.splitc === 0) { buf = Buffer.alloc(1); } | ||||
| @@ -451,14 +480,19 @@ module.exports = function(RED) { | ||||
|                 if (socketTimeout !== null) { clients[connection_id].client.setTimeout(socketTimeout);} | ||||
|  | ||||
|                 if (host && port) { | ||||
|                     clients[connection_id].connecting = true; | ||||
|                     clients[connection_id].client.connect(port, host, function() { | ||||
|                         //node.log(RED._("tcpin.errors.client-connected")); | ||||
|                         node.status({fill:"green",shape:"dot",text:"common.status.connected"}); | ||||
|                         if (clients[connection_id] && clients[connection_id].client) { | ||||
|                             clients[connection_id].connected = true; | ||||
|                             clients[connection_id].client.write(clients[connection_id].msg.payload); | ||||
|                             clients[connection_id].connecting = false; | ||||
|                             let msg; | ||||
|                             while (msg = dequeue(clients[connection_id].msgQueue)) { | ||||
|                                 clients[connection_id].client.write(msg.payload); | ||||
|                             } | ||||
|                             if (node.out === "time" && node.splitc < 0) { | ||||
|                                 clients[connection_id].connected = false; | ||||
|                                 clients[connection_id].connected = clients[connection_id].connecting = false; | ||||
|                                 clients[connection_id].client.end(); | ||||
|                                 delete clients[connection_id]; | ||||
|                                 node.status({}); | ||||
| @@ -473,9 +507,10 @@ module.exports = function(RED) { | ||||
|                 clients[connection_id].client.on('data', function(data) { | ||||
|                     if (node.out === "sit") { // if we are staying connected just send the buffer | ||||
|                         if (clients[connection_id]) { | ||||
|                             if (!clients[connection_id].hasOwnProperty("msg")) { clients[connection_id].msg = {}; } | ||||
|                             clients[connection_id].msg.payload = data; | ||||
|                             node.send(RED.util.cloneMessage(clients[connection_id].msg)); | ||||
|                             let msg = dequeue(clients[connection_id].msgQueue) || {}; | ||||
|                             clients[connection_id].msgQueue.unshift(msg); | ||||
|                             msg.payload = data; | ||||
|                             node.send(RED.util.cloneMessage(msg)); | ||||
|                         } | ||||
|                     } | ||||
|                     // else if (node.splitc === 0) { | ||||
| @@ -495,9 +530,11 @@ module.exports = function(RED) { | ||||
|                                         clients[connection_id].timeout = setTimeout(function () { | ||||
|                                             if (clients[connection_id]) { | ||||
|                                                 clients[connection_id].timeout = null; | ||||
|                                                 clients[connection_id].msg.payload = Buffer.alloc(i+1); | ||||
|                                                 buf.copy(clients[connection_id].msg.payload,0,0,i+1); | ||||
|                                                 node.send(clients[connection_id].msg); | ||||
|                                                 let msg = dequeue(clients[connection_id].msgQueue) || {}; | ||||
|                                                 clients[connection_id].msgQueue.unshift(msg); | ||||
|                                                 msg.payload = Buffer.alloc(i+1); | ||||
|                                                 buf.copy(msg.payload,0,0,i+1); | ||||
|                                                 node.send(msg); | ||||
|                                                 if (clients[connection_id].client) { | ||||
|                                                     node.status({}); | ||||
|                                                     clients[connection_id].client.destroy(); | ||||
| @@ -516,9 +553,11 @@ module.exports = function(RED) { | ||||
|                                 i += 1; | ||||
|                                 if ( i >= node.splitc) { | ||||
|                                     if (clients[connection_id]) { | ||||
|                                         clients[connection_id].msg.payload = Buffer.alloc(i); | ||||
|                                         buf.copy(clients[connection_id].msg.payload,0,0,i); | ||||
|                                         node.send(clients[connection_id].msg); | ||||
|                                         let msg = dequeue(clients[connection_id].msgQueue) || {}; | ||||
|                                         clients[connection_id].msgQueue.unshift(msg); | ||||
|                                         msg.payload = Buffer.alloc(i); | ||||
|                                         buf.copy(msg.payload,0,0,i); | ||||
|                                         node.send(msg); | ||||
|                                         if (clients[connection_id].client) { | ||||
|                                             node.status({}); | ||||
|                                             clients[connection_id].client.destroy(); | ||||
| @@ -534,9 +573,11 @@ module.exports = function(RED) { | ||||
|                                 i += 1; | ||||
|                                 if (data[j] == node.splitc) { | ||||
|                                     if (clients[connection_id]) { | ||||
|                                         clients[connection_id].msg.payload = Buffer.alloc(i); | ||||
|                                         buf.copy(clients[connection_id].msg.payload,0,0,i); | ||||
|                                         node.send(clients[connection_id].msg); | ||||
|                                         let msg = dequeue(clients[connection_id].msgQueue) || {}; | ||||
|                                         clients[connection_id].msgQueue.unshift(msg); | ||||
|                                         msg.payload = Buffer.alloc(i); | ||||
|                                         buf.copy(msg.payload,0,0,i); | ||||
|                                         node.send(msg); | ||||
|                                         if (clients[connection_id].client) { | ||||
|                                             node.status({}); | ||||
|                                             clients[connection_id].client.destroy(); | ||||
| @@ -554,7 +595,7 @@ module.exports = function(RED) { | ||||
|                     //console.log("END"); | ||||
|                     node.status({fill:"grey",shape:"ring",text:"common.status.disconnected"}); | ||||
|                     if (clients[connection_id] && clients[connection_id].client) { | ||||
|                         clients[connection_id].connected = false; | ||||
|                         clients[connection_id].connected = clients[connection_id].connecting = false; | ||||
|                         clients[connection_id].client = null; | ||||
|                     } | ||||
|                 }); | ||||
| @@ -562,7 +603,7 @@ module.exports = function(RED) { | ||||
|                 clients[connection_id].client.on('close', function() { | ||||
|                     //console.log("CLOSE"); | ||||
|                     if (clients[connection_id]) { | ||||
|                         clients[connection_id].connected = false; | ||||
|                         clients[connection_id].connected = clients[connection_id].connecting = false; | ||||
|                     } | ||||
|  | ||||
|                     var anyConnected = false; | ||||
| @@ -592,21 +633,23 @@ module.exports = function(RED) { | ||||
|                 clients[connection_id].client.on('timeout',function() { | ||||
|                     //console.log("TIMEOUT"); | ||||
|                     if (clients[connection_id]) { | ||||
|                         clients[connection_id].connected = false; | ||||
|                         clients[connection_id].connected = clients[connection_id].connecting = false; | ||||
|                         node.status({fill:"grey",shape:"dot",text:"tcpin.errors.connect-timeout"}); | ||||
|                         //node.warn(RED._("tcpin.errors.connect-timeout")); | ||||
|                         if (clients[connection_id].client) { | ||||
|                             clients[connection_id].connecting = true; | ||||
|                             clients[connection_id].client.connect(port, host, function() { | ||||
|                                 clients[connection_id].connected = true; | ||||
|                                 clients[connection_id].connecting = false; | ||||
|                                 node.status({fill:"green",shape:"dot",text:"common.status.connected"}); | ||||
|                             }); | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|             } | ||||
|             else { | ||||
|             else if (!clients[connection_id].connecting && clients[connection_id].connected) { | ||||
|                 if (clients[connection_id] && clients[connection_id].client) { | ||||
|                     clients[connection_id].client.write(clients[connection_id].msg.payload); | ||||
|                     clients[connection_id].client.write(dequeue(clients[connection_id].msgQueue)); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|   | ||||
| @@ -43,6 +43,7 @@ | ||||
|     "cookie-parser": "1.4.3", | ||||
|     "cors": "2.8.4", | ||||
|     "cron": "1.3.0", | ||||
|     "denque": "1.3.0", | ||||
|     "express": "4.16.3", | ||||
|     "express-session": "1.15.6", | ||||
|     "fs-extra": "5.0.0", | ||||
|   | ||||
| @@ -41,6 +41,10 @@ module.exports = { | ||||
|     //  defaults to no timeout | ||||
|     //socketTimeout: 120000, | ||||
|  | ||||
|     // Maximum number of messages to wait in queue while attempting to connect to TCP socket | ||||
|     //  defaults to 1000 | ||||
|     //tcpMsgQueueSize: 2000, | ||||
|  | ||||
|     // Timeout in milliseconds for HTTP request connections | ||||
|     //  defaults to 120 seconds | ||||
|     //httpRequestTimeout: 120000, | ||||
|   | ||||
| @@ -19,6 +19,7 @@ var should = require("should"); | ||||
| var stoppable = require('stoppable'); | ||||
| var helper = require("node-red-node-test-helper"); | ||||
| var tcpinNode = require("../../../../nodes/core/io/31-tcpin.js"); | ||||
| const RED = require("../../../../red/red.js"); | ||||
|  | ||||
|  | ||||
| describe('TCP Request Node', function() { | ||||
| @@ -28,12 +29,12 @@ describe('TCP Request Node', function() { | ||||
|     function startServer(done) { | ||||
|         port += 1; | ||||
|         server = stoppable(net.createServer(function(c) { | ||||
| 	    c.on('data', function(data) { | ||||
| 		var rdata = "ACK:"+data.toString(); | ||||
| 		c.write(rdata); | ||||
| 	    }); | ||||
|             c.on('data', function(data) { | ||||
|                 var rdata = "ACK:"+data.toString(); | ||||
|                 c.write(rdata); | ||||
|             }); | ||||
|             c.on('error', function(err) { | ||||
| 		startServer(done); | ||||
|                 startServer(done); | ||||
|             }); | ||||
|         })).listen(port, "127.0.0.1", function(err) { | ||||
|             done(); | ||||
| @@ -64,48 +65,108 @@ describe('TCP Request Node', function() { | ||||
|                     done(err); | ||||
|                 } | ||||
|             }); | ||||
| 	    if((typeof val0) === 'object') { | ||||
| 		n1.receive(val0); | ||||
| 	    } else { | ||||
| 		n1.receive({payload:val0}); | ||||
| 	    } | ||||
|             if((typeof val0) === 'object') { | ||||
|                 n1.receive(val0); | ||||
|             } else { | ||||
|                 n1.receive({payload:val0}); | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     it('should send & recv data', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, "foo", "ACK:foo", done) | ||||
|     function testTCPMany(flow, values, result, done) { | ||||
|         helper.load(tcpinNode, flow, () => { | ||||
|             const n1 = helper.getNode("n1"); | ||||
|             const n2 = helper.getNode("n2"); | ||||
|             n2.on("input", msg => { | ||||
|                 try { | ||||
|                     msg.should.have.property('payload', Buffer(result)); | ||||
|                     done(); | ||||
|                 } catch(err) { | ||||
|                     done(err); | ||||
|                 } | ||||
|             }); | ||||
|             values.forEach(value => { | ||||
|                 n1.receive(typeof value === 'object' ? value : {payload: value}); | ||||
|             }); | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     describe('single message', function () { | ||||
|         it('should send & recv data', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|             {id:"n2", type:"helper"}]; | ||||
|             testTCP(flow, "foo", "ACK:foo", done) | ||||
|         }); | ||||
|  | ||||
|         it('should send & recv data when specified character received', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"char", splitc: "0", wires:[["n2"]] }, | ||||
|             {id:"n2", type:"helper"}]; | ||||
|             testTCP(flow, "foo0bar0", "ACK:foo0", done); | ||||
|         }); | ||||
|  | ||||
|         it('should send & recv data after fixed number of chars received', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"count", splitc: "7", wires:[["n2"]] }, | ||||
|             {id:"n2", type:"helper"}]; | ||||
|             testTCP(flow, "foo bar", "ACK:foo", done); | ||||
|         }); | ||||
|  | ||||
|         it('should send & receive, then keep connection', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, | ||||
|             {id:"n2", type:"helper"}]; | ||||
|             testTCP(flow, "foo", "ACK:foo", done); | ||||
|         }); | ||||
|  | ||||
|         it('should send & recv data to/from server:port from msg', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"", port:"", out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|             {id:"n2", type:"helper"}]; | ||||
|             testTCP(flow, {payload:"foo", host:"localhost", port:port}, "ACK:foo", done) | ||||
|         }); | ||||
|     }); | ||||
|  | ||||
|     it('should send & recv data when specified character received', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"char", splitc: "0", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, "foo0bar0", "ACK:foo0", done); | ||||
|     }); | ||||
|     describe('many messages', function () { | ||||
|         it('should send & recv data', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|  | ||||
|     it('should send & recv data after fixed number of chars received', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"count", splitc: "7", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, "foo bar", "ACK:foo", done); | ||||
|     }); | ||||
|             testTCPMany(flow, ['f', 'o', 'o'], 'ACK:foo', done); | ||||
|         }); | ||||
|  | ||||
|     it('should send & receive, then keep connection', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, "foo", "ACK:foo", done); | ||||
|     }); | ||||
|         it('should send & recv data when specified character received', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"char", splitc: "0", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|             testTCPMany(flow, ["foo0","bar0"], "ACK:foo0", done); | ||||
|         }); | ||||
|  | ||||
|     it('should send & close', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, "foo", "ACK:foo", done); | ||||
|     }); | ||||
|         it('should send & recv data after fixed number of chars received', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"count", splitc: "7", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|             testTCPMany(flow, ["fo", "ob", "ar"], "ACK:foo", done); | ||||
|         }); | ||||
|  | ||||
|     it('should send & recv data to/from server:port from msg', function(done) { | ||||
|         var flow = [{id:"n1", type:"tcp request", server:"", port:"", out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|                     {id:"n2", type:"helper"}]; | ||||
| 	testTCP(flow, {payload:"foo", host:"localhost", port:port}, "ACK:foo", done) | ||||
|     }); | ||||
|  | ||||
|         it('should send & receive, then keep connection', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|             testTCPMany(flow, ["foo", "bar", "baz"], "ACK:foobarbaz", done); | ||||
|         }); | ||||
|  | ||||
|         it('should send & recv data to/from server:port from msg', function(done) { | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"", port:"", out:"time", splitc: "0", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|             testTCPMany(flow, [ | ||||
|                 {payload:"f", host:"localhost", port:port}, | ||||
|                 {payload:"o", host:"localhost", port:port}, | ||||
|                 {payload:"o", host:"localhost", port:port}], "ACK:foo", done); | ||||
|         }); | ||||
|  | ||||
|         it('should limit the queue size', function (done) { | ||||
|             RED.settings.tcpMsgQueueSize = 10; | ||||
|             var flow = [{id:"n1", type:"tcp request", server:"localhost", port:port, out:"sit", splitc: "5", wires:[["n2"]] }, | ||||
|                         {id:"n2", type:"helper"}]; | ||||
|             // create one more msg than is allowed | ||||
|             const msgs = new Array(RED.settings.tcpMsgQueueSize + 1).fill('x'); | ||||
|             const expected = msgs.slice(0, -1); | ||||
|             testTCPMany(flow, msgs, "ACK:" + expected.join(''), done); | ||||
|         }); | ||||
|     }); | ||||
| }); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user