diff --git a/nodes/core/io/31-tcpin.js b/nodes/core/io/31-tcpin.js index 1e8ae8795..8c3b7bbd5 100644 --- a/nodes/core/io/31-tcpin.js +++ b/nodes/core/io/31-tcpin.js @@ -83,7 +83,7 @@ module.exports = function(RED) { } }); client.on('end', function() { - if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { + if (!node.stream || (node.datatype == "utf8" && node.newline !== "" && buffer.length > 0)) { var msg = {topic:node.topic, payload:buffer}; msg._session = {type:"tcp",id:id}; if (buffer.length !== 0) { @@ -407,66 +407,73 @@ module.exports = function(RED) { } // jshint ignore:line } - var buf; - if (this.out == "count") { - if (this.splitc === 0) { buf = new Buffer(1); } - else { buf = new Buffer(this.splitc); } - } - else { buf = new Buffer(65536); } // set it to 64k... hopefully big enough for most TCP packets.... but only hopefully - this.connected = false; var node = this; - var client; - var m; + + var clients = {}; this.on("input", function(msg) { - m = msg; var i = 0; if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) { msg.payload = msg.payload.toString(); } + + var host = node.server || msg.host; + var port = node.port || msg.port; + var connection_id = host + ":" + port; + if (!node.connected) { - client = net.Socket(); - if (socketTimeout !== null) { client.setTimeout(socketTimeout); } - var host = node.server || msg.host; - var port = node.port || msg.port; + var buf; + if (this.out == "count") { + if (this.splitc === 0) { buf = new Buffer(1); } + else { buf = new Buffer(this.splitc); } + } + else { buf = new Buffer(65536); } // set it to 64k... hopefully big enough for most TCP packets.... but only hopefully + + clients[connection_id] = net.Socket(); + if (socketTimeout !== null) { clients[connection_id].setTimeout(socketTimeout);} if (host && port) { - client.connect(port, host, function() { + clients[connection_id].connect(port, host, function() { //node.log(RED._("tcpin.errors.client-connected")); node.status({fill:"green",shape:"dot",text:"common.status.connected"}); node.connected = true; - client.write(msg.payload); + if (clients[connection_id]) { + clients[connection_id].write(msg.payload); + } }); } else { node.warn(RED._("tcpin.errors.no-host")); } - client.on('data', function(data) { + clients[connection_id].on('data', function(data) { if (node.out == "sit") { // if we are staying connected just send the buffer - m.payload = data; - node.send(m); + msg.payload = data; + node.send(RED.util.cloneMessage(msg)); } else if (node.splitc === 0) { msg.payload = data; - node.send(msg); + node.send(RED.util.cloneMessage(msg)); } else { for (var j = 0; j < data.length; j++ ) { if (node.out === "time") { // do the timer thing - if (node.tout) { + if (clients[connection_id] && clients[connection_id].timeout) { i += 1; buf[i] = data[j]; } else { - node.tout = setTimeout(function () { - node.tout = null; + clients[connection_id].timeout = setTimeout(function () { + clients[connection_id].timeout = null; msg.payload = new Buffer(i+1); buf.copy(msg.payload,0,0,i+1); node.send(msg); - if (client) { node.status({}); client.destroy(); } + if (clients[connection_id]) { + node.status({}); clients[connection_id].destroy(); + delete clients[connection_id]; + } }, node.splitc); i = 0; buf[0] = data[j]; @@ -480,7 +487,10 @@ module.exports = function(RED) { msg.payload = new Buffer(i); buf.copy(msg.payload,0,0,i); node.send(msg); - if (client) { node.status({}); client.destroy(); } + if (clients[connection_id]) { + node.status({}); clients[connection_id].destroy(); + delete clients[connection_id]; + } i = 0; } } @@ -492,7 +502,10 @@ module.exports = function(RED) { msg.payload = new Buffer(i); buf.copy(msg.payload,0,0,i); node.send(msg); - if (client) { node.status({}); client.destroy(); } + if (clients[connection_id]) { + node.status({}); clients[connection_id].destroy(); + delete clients[connection_id]; + } i = 0; } } @@ -500,48 +513,53 @@ module.exports = function(RED) { } }); - client.on('end', function() { + clients[connection_id].on('end', function() { //console.log("END"); node.connected = false; node.status({fill:"grey",shape:"ring",text:"common.status.disconnected"}); - client = null; + clients[connection_id] = null; }); - client.on('close', function() { + clients[connection_id].on('close', function() { //console.log("CLOSE"); node.connected = false; if (node.done) { node.done(); } }); - client.on('error', function() { + clients[connection_id].on('error', function() { //console.log("ERROR"); node.connected = false; node.status({fill:"red",shape:"ring",text:"common.status.error"}); node.error(RED._("tcpin.errors.connect-fail"),msg); - if (client) { client.destroy(); } + if (clients[connection_id]) { + clients[connection_id].destroy(); + delete clients[connection_id]; + } }); - client.on('timeout',function() { - //console.log("TIMEOUT"); + clients[connection_id].on('timeout',function() { node.connected = false; node.status({fill:"grey",shape:"dot",text:"tcpin.errors.connect-timeout"}); //node.warn(RED._("tcpin.errors.connect-timeout")); - if (client) { - client.connect(port, host, function() { + if (clients[connection_id]) { + clients[connection_id].connect(port, host, function() { node.connected = true; node.status({fill:"green",shape:"dot",text:"common.status.connected"}); }); } }); } - else { client.write(msg.payload); } + else { + clients[connection_id].write(msg.payload); + } }); this.on("close", function(done) { node.done = done; - if (client) { - client.destroy(); + for (var client in clients) { + clients[client].destroy(); } + clients = {}; node.status({}); if (!node.connected) { done(); } });