From 2ba5e0fe3e1df3207c8be2a7a4b51b8511f559ff Mon Sep 17 00:00:00 2001 From: Dave C-J Date: Tue, 24 Dec 2013 13:12:17 +0000 Subject: [PATCH] Add socketTimeout to settings.js for TCP server sockets Fixes #125 adds an optional socketTimeout param to settings.js file to add a TCP server socket timeout. Default is no timeout. --- nodes/core/io/31-tcpin.js | 263 +++++++++++++++++++------------------- settings.js | 20 +-- 2 files changed, 143 insertions(+), 140 deletions(-) diff --git a/nodes/core/io/31-tcpin.js b/nodes/core/io/31-tcpin.js index fdfac98c0..100f41fbc 100644 --- a/nodes/core/io/31-tcpin.js +++ b/nodes/core/io/31-tcpin.js @@ -16,12 +16,11 @@ var RED = require(process.env.NODE_RED_HOME+"/red/red"); var reconnectTime = RED.settings.socketReconnectTime||10000; +var socketTimeout = RED.settings.socketTimeout||null; var net = require('net'); var connectionPool = {}; - - function TcpIn(n) { RED.nodes.createNode(this,n); this.host = n.host; @@ -43,55 +42,53 @@ function TcpIn(n) { node.log("connecting to "+node.host+":"+node.port); var id = (1+Math.random()*4294967295).toString(16); client = net.connect(node.port, node.host, function() { - buffer = (node.datatype == 'buffer')? new Buffer(0):""; - node.log("connected to "+node.host+":"+node.port); + buffer = (node.datatype == 'buffer')? new Buffer(0):""; + node.log("connected to "+node.host+":"+node.port); }); connectionPool[id] = client; - + client.on('data', function (data) { - if (node.datatype != 'buffer') { - data = data.toString(node.datatype); - } - if (node.stream) { - if ((node.datatype) === "utf8" && node.newline != "") { - buffer = buffer+data; - var parts = buffer.split(node.newline); - for (var i = 0;i 0)) { - var msg = {topic:node.topic,payload:buffer}; + var msg = {topic:node.topic, payload:data}; msg._session = {type:"tcp",id:id}; node.send(msg); - buffer = null; } + } else { + if ((typeof data) === "string") { + buffer = buffer+data; + } else { + buffer = Buffer.concat([buffer,data],buffer.length+data.length); + } + } + }); + client.on('end', function() { + if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { + var msg = {topic:node.topic,payload:buffer}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + buffer = null; + } }); - client.on('close', function() { - delete connectionPool[id]; - node.log("connection lost to "+node.host+":"+node.port); - if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); - } + delete connectionPool[id]; + node.log("connection lost to "+node.host+":"+node.port); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + } }); - client.on('error', function(err) { node.log(err); }); @@ -105,52 +102,57 @@ function TcpIn(n) { }); } else { var server = net.createServer(function (socket) { - var id = (1+Math.random()*4294967295).toString(16); - connectionPool[id] = socket; - - var buffer = (node.datatype == 'buffer')? new Buffer(0):""; - socket.on('data', function (data) { - if (node.datatype != 'buffer') { - data = data.toString(node.datatype); - } + if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } + var id = (1+Math.random()*4294967295).toString(16); + connectionPool[id] = socket; - if (node.stream) { - if ((typeof data) === "string" && node.newline != "") { - buffer = buffer+data; - var parts = buffer.split(node.newline); - for (var i = 0;i 0)) { - var msg = {topic:node.topic,payload:buffer}; + var buffer = (node.datatype == 'buffer')? new Buffer(0):""; + socket.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + + if (node.stream) { + if ((typeof data) === "string" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i 0)) { + var msg = {topic:node.topic,payload:buffer}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + buffer = null; + } + }); + socket.on('timeout', function() { + node.log('timeout closed socket port '+node.port); + socket.end(); + }); + socket.on('close', function() { + delete connectionPool[id]; + }); + socket.on('error',function(err) { + node.log(err); + }); }); server.listen(node.port); node.log('listening on port '+node.port); @@ -163,7 +165,6 @@ function TcpIn(n) { } } - RED.nodes.registerType("tcp in",TcpIn); function TcpOut(n) { @@ -180,51 +181,47 @@ function TcpOut(n) { var reconnectTimeout; var client = null; var connected = false; - + function setupTcpClient() { node.log("connecting to "+node.host+":"+node.port); client = net.connect(node.port, node.host, function() { - connected = true; - node.log("connected to "+node.host+":"+node.port); + connected = true; + node.log("connected to "+node.host+":"+node.port); }); - client.on('error', function (err) { - node.log('error : '+err); + node.log('error : '+err); }); - client.on('end', function (err) { }); - client.on('close', function() { - node.log("connection lost to "+node.host+":"+node.port); - connected = false; - client.destroy(); - if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); - } + node.log("connection lost to "+node.host+":"+node.port); + connected = false; + client.destroy(); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + } }); } setupTcpClient(); - - + node.on("input", function(msg) { - if (connected && msg.payload != null) { - if (Buffer.isBuffer(msg.payload)) { - client.write(msg.payload); - } else if (typeof msg.payload === "string" && node.base64) { - client.write(new Buffer(msg.payload,'base64')); - } else { - client.write(new Buffer(""+msg.payload)); - } + if (connected && msg.payload != null) { + if (Buffer.isBuffer(msg.payload)) { + client.write(msg.payload); + } else if (typeof msg.payload === "string" && node.base64) { + client.write(new Buffer(msg.payload,'base64')); + } else { + client.write(new Buffer(""+msg.payload)); } + } }); - + node.on("close", function() { this.closing = true; client.end(); clearTimeout(reconnectTimeout); }); - + } else if (node.beserver == "reply") { node.on("input",function(msg) { if (msg._session && msg._session.type == "tcp") { @@ -243,33 +240,37 @@ function TcpOut(n) { } else { var connectedSockets = []; var server = net.createServer(function (socket) { - var remoteDetails = socket.remoteAddress+":"+socket.remotePort; - node.log("connection from "+remoteDetails); - connectedSockets.push(socket); - socket.on('close',function() { - node.log("connection closed from "+remoteDetails); - connectedSockets.splice(connectedSockets.indexOf(socket),1); - }); - socket.on('error',function() { - node.log("socket error from "+remoteDetails); - connectedSockets.splice(connectedSockets.indexOf(socket),1); - }); - + if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } + var remoteDetails = socket.remoteAddress+":"+socket.remotePort; + node.log("connection from "+remoteDetails); + connectedSockets.push(socket); + socket.on('timeout', function() { + node.log('timeout closed socket port '+node.port); + socket.end(); + }); + socket.on('close',function() { + node.log("connection closed from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + }); + socket.on('error',function() { + node.log("socket error from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + }); }); node.on("input", function(msg) { - if (msg.payload != null) { - var buffer; - if (Buffer.isBuffer(msg.payload)) { - buffer = msg.payload; - } else if (typeof msg.payload === "string" && node.base64) { - buffer = new Buffer(msg.payload,'base64'); - } else { - buffer = new Buffer(""+msg.payload); - } - for (var i = 0; i.json //flowFile: 'flows.json', - + // By default, all user data is stored in the Node-RED install directory. To // use a different location, the following property can be used //userDir: '/home/nol/.node-red/', - + // Node-RED scans the `nodes` directory in the install directory to find nodes. // The following property can be used to specify an additional directory to scan. //nodesDir: '/home/nol/.node-red/nodes', - + // You can protect the user interface with a userid and password by using the following property // the password must be an md5 hash eg.. 5f4dcc3b5aa765d61d8327deb882cf99 ('password') //httpAuth: {user:"user",pass:"5f4dcc3b5aa765d61d8327deb882cf99"}, @@ -53,11 +57,11 @@ module.exports = { // The following property can be used to specifiy a different root path. //httpRoot: '/admin', - // When httpRoot is used to move the UI to a different root path, the + // When httpRoot is used to move the UI to a different root path, the // following property can be used to identify a directory of static content // that should be served at http://localhost:1880/. //httpStatic: '/home/nol/node-red-dashboard/', - + // The following property can be used to enable HTTPS // See http://nodejs.org/api/https.html#https_https_createserver_options_requestlistener // for details on its contents.