From fc679adefb9c5d20d4461fa2b35449e24a67a89a Mon Sep 17 00:00:00 2001 From: Dave C-J Date: Wed, 3 Sep 2014 20:06:29 +0100 Subject: [PATCH] Allow TCP node option to break connections per message (and auto reconnect) - eg for file trnasfer to indicate EOF. Change to FA icons. --- nodes/core/io/31-tcpin.html | 55 +++++++--- nodes/core/io/31-tcpin.js | 210 ++++++++++++++++++++++-------------- 2 files changed, 172 insertions(+), 93 deletions(-) diff --git a/nodes/core/io/31-tcpin.html b/nodes/core/io/31-tcpin.html index 1f6ae983c..fff132686 100644 --- a/nodes/core/io/31-tcpin.html +++ b/nodes/core/io/31-tcpin.html @@ -16,7 +16,7 @@ @@ -118,7 +118,7 @@ @@ -161,6 +174,7 @@ port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } }, beserver: {value:"client",required:true}, base64: {value:false,required:true}, + end: {value:false,required:true}, name: {value:""} }, inputs:1, @@ -179,15 +193,27 @@ if (sockettype == "reply") { $("#node-input-port-row").hide(); $("#node-input-host-row").hide(); + $("#node-input-end-row").hide(); } else { $("#node-input-port-row").show(); + $("#node-input-end-row").show(); } if (sockettype == "client") { $("#node-input-host-row").show(); + $("#fin-tip").show(); } else { $("#node-input-host-row").hide(); + $("#fin-tip").hide(); } + + if (sockettype == "server") { + $("#fin-tip2").show(); + } + else { + $("#fin-tip2").hide(); + } + }; updateOptions(); $("#node-input-beserver").change(updateOptions); @@ -208,12 +234,13 @@ +
- +
Tip: outputs a binary Buffer, so you may want to .toString() it.
@@ -229,18 +256,22 @@ if (previous != "time") $("#node-input-splitc").val("0"); $("#node-units").text("ms"); } - else { + else if ($("#node-input-out").val() == "count") { if (previous != "count") $("#node-input-splitc").val("12"); $("#node-units").text("chars"); } + else { + if (previous != "sit") $("#node-input-splitc").val("0"); + $("#node-units").text(""); + } }); diff --git a/nodes/core/io/31-tcpin.js b/nodes/core/io/31-tcpin.js index 34044acb2..2e4e5e7b2 100644 --- a/nodes/core/io/31-tcpin.js +++ b/nodes/core/io/31-tcpin.js @@ -34,11 +34,13 @@ module.exports = function(RED) { this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); this.closing = false; var node = this; + var count = 0; if (!node.server) { var buffer = null; var client; var reconnectTimeout; + var end = false; var setupTcpClient = function() { node.log("connecting to "+node.host+":"+node.port); node.status({fill:"grey",shape:"dot",text:"connecting"}); @@ -81,16 +83,25 @@ module.exports = function(RED) { if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { var msg = {topic:node.topic,payload:buffer}; msg._session = {type:"tcp",id:id}; - node.send(msg); + if (buffer.length !== 0) { + end = true; // only ask for fast re-connect if we actually got something + node.send(msg); + } buffer = null; } }); client.on('close', function() { delete connectionPool[id]; - node.log("connection lost to "+node.host+":"+node.port); node.status({fill:"red",shape:"ring",text:"disconnected"}); if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + if (end) { // if we were asked to close then try to reconnect once very quick. + end = false; + reconnectTimeout = setTimeout(setupTcpClient, 20); + } + else { + node.log("connection lost to "+node.host+":"+node.port); + reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + } } }); client.on('error', function(err) { @@ -109,13 +120,13 @@ module.exports = function(RED) { if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } var id = (1+Math.random()*4294967295).toString(16); connectionPool[id] = socket; + node.status({text:++count+" connections"}); var buffer = (node.datatype == 'buffer')? new Buffer(0):""; socket.on('data', function (data) { if (node.datatype != 'buffer') { data = data.toString(node.datatype); } - if (node.stream) { if ((typeof data) === "string" && node.newline != "") { buffer = buffer+data; @@ -140,10 +151,12 @@ module.exports = function(RED) { } }); socket.on('end', function() { - if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { - var msg = {topic:node.topic,payload:buffer}; - msg._session = {type:"tcp",id:id}; - node.send(msg); + if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) { + if (buffer.length > 0) { + var msg = {topic:node.topic,payload:buffer}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } buffer = null; } }); @@ -153,6 +166,7 @@ module.exports = function(RED) { }); socket.on('close', function() { delete connectionPool[id]; + node.status({text:--count+" connections"}); }); socket.on('error',function(err) { node.log(err); @@ -186,6 +200,7 @@ module.exports = function(RED) { this.host = n.host; this.port = n.port * 1; this.base64 = n.base64; + this.doend = n.end || false; this.beserver = n.beserver; this.name = n.name; this.closing = false; @@ -195,6 +210,7 @@ module.exports = function(RED) { var reconnectTimeout; var client = null; var connected = false; + var end = false; var setupTcpClient = function() { node.log("connecting to "+node.host+":"+node.port); @@ -210,12 +226,18 @@ module.exports = function(RED) { client.on('end', function (err) { }); client.on('close', function() { - node.log("connection lost to "+node.host+":"+node.port); node.status({fill:"red",shape:"ring",text:"disconnected"}); connected = false; client.destroy(); if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + if (end) { + end = false; + reconnectTimeout = setTimeout(setupTcpClient,20); + } + else { + node.log("connection lost to "+node.host+":"+node.port); + reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + } } }); } @@ -230,6 +252,10 @@ module.exports = function(RED) { } else { client.write(new Buffer(""+msg.payload)); } + if (node.doend === true) { + end = true; + client.end(); + } } }); @@ -256,11 +282,13 @@ module.exports = function(RED) { }); } else { var connectedSockets = []; + node.status({text:"0 connections"}); var server = net.createServer(function (socket) { if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } var remoteDetails = socket.remoteAddress+":"+socket.remotePort; node.log("connection from "+remoteDetails); connectedSockets.push(socket); + node.status({text:connectedSockets.length+" connections"}); socket.on('timeout', function() { node.log('timeout closed socket port '+node.port); socket.end(); @@ -268,12 +296,15 @@ module.exports = function(RED) { socket.on('close',function() { node.log("connection closed from "+remoteDetails); connectedSockets.splice(connectedSockets.indexOf(socket),1); + node.status({text:connectedSockets.length+" connections"}); }); socket.on('error',function() { node.log("socket error from "+remoteDetails); connectedSockets.splice(connectedSockets.indexOf(socket),1); + node.status({text:connectedSockets.length+" connections"}); }); }); + node.on("input", function(msg) { if (msg.payload != null) { var buffer; @@ -285,7 +316,8 @@ module.exports = function(RED) { buffer = new Buffer(""+msg.payload); } for (var i = 0; i= node.serialConfig.count) { + node.send({"payload": buf}); + client.end(); + i = 0; + } + } + // look for a char else { - node.tout = setTimeout(function () { - node.tout = null; - var m = new Buffer(i+1); - buf.copy(m,0,0,i+1); + buf[i] = data[j]; + i += 1; + if (data[j] == node.splitc) { + var m = new Buffer(i); + buf.copy(m,0,0,i); node.send({"payload": m}); client.end(); m = null; - }, node.splitc); - i = 0; - buf[0] = data[j]; - } - } - // count bytes into a buffer... - else if (node.out == "count") { - buf[i] = data[j]; - i += 1; - if ( i >= node.serialConfig.count) { - node.send({"payload": buf}); - client.end(); - i = 0; - } - } - // look for a char - else { - buf[i] = data[j]; - i += 1; - if (data[j] == node.splitc) { - var m = new Buffer(i); - buf.copy(m,0,0,i); - node.send({"payload": m}); - client.end(); - m = null; - i = 0; + i = 0; + } } } } - } + }); - }); - client.on('end', function() { - //node.log('client disconnected'); - }); - client.on('error', function() { - node.log('connect failed'); - if (client) { client.end(); } - }); - client.on('timeout',function() { - node.log('connect timeout'); - if (client) { - client.end(); - setTimeout(function() { - client.connect(node.port, node.server, function() { - //node.log('client connected'); - client.write(msg.payload); - }); - },reconnectTime); - } + client.on('end', function() { + //node.log('client disconnected'); + node.connected = false; + node.status({}); + client = null; + }); - }); + client.on('error', function() { + node.log('connect failed'); + node.status({fill:"red",shape:"ring",text:"error"}); + if (client) { client.end(); } + }); + + client.on('timeout',function() { + node.log('connect timeout'); + if (client) { + client.end(); + setTimeout(function() { + client.connect(node.port, node.server, function() { + //node.log('client connected'); + node.connected = true; + client.write(msg.payload); + }); + },reconnectTime); + } + }); + } + else { client.write(msg.payload); } }); this.on("close", function() {