diff --git a/nodes/io/31-tcpin.html b/nodes/io/31-tcpin.html index d1f3b7f67..4257d4a50 100644 --- a/nodes/io/31-tcpin.html +++ b/nodes/io/31-tcpin.html @@ -16,22 +16,36 @@ diff --git a/nodes/io/31-tcpin.js b/nodes/io/31-tcpin.js index f548ebfe2..78b7ab731 100644 --- a/nodes/io/31-tcpin.js +++ b/nodes/io/31-tcpin.js @@ -15,7 +15,7 @@ **/ var RED = require("../../red/red"); -var reConnect = RED.settings.socketReconnectTime||10000; +var reconnectTime = RED.settings.socketReconnectTime||10000; var net = require('net'); function TcpIn(n) { @@ -23,76 +23,126 @@ function TcpIn(n) { this.host = n.host; this.port = n.port * 1; this.topic = n.topic; + this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/ + this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */ + this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r"); this.base64 = n.base64; - this.server = n.server; + this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); + this.closing = false; var node = this; - + if (!node.server) { + var buffer = null; var client; - var to; - + var reconnectTimeout; function setupTcpClient() { - node.log('connecting to port '+node.port); + node.log("connecting to "+node.host+":"+node.port); client = net.connect(node.port, node.host, function() { - node.log("input connected to "+node.host+":"+node.port); + buffer = (node.datatype == 'buffer')? new Buffer(0):""; + node.log("connected to "+node.host+":"+node.port); }); - + client.on('data', function (data) { - var msg; - if (node.base64) { msg = { topic:node.topic, payload:new Buffer(data).toString('base64') }; } - else { msg = {topic:node.topic, payload:data}; } - node.send(msg); + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + if (node.stream) { + if ((typeof data) === "string" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i 0)) { + var msg = {topic:node.topic,payload:buffer}; + node.send(msg); + buffer = null; + } }); client.on('close', function() { - client.destroy(); - node.log('closed'); - to = setTimeout(setupTcpClient, reConnect); + node.log("connection lost to "+node.host+":"+node.port); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + } }); client.on('error', function(err) { - node.log('error : '+err); - //to = setTimeout(setupTcpClient, reConnect); + node.log(err); }); } setupTcpClient(); this._close = function() { + this.closing = true; client.end(); - clearTimeout(to); - node.log('input stopped'); + clearTimeout(reconnectTimeout); } - } - else { + } else { var server = net.createServer(function (socket) { - var buffer = null; - socket.on('data', function (chunk) { - //if (buffer == null) { - // buffer = chunk; - //} else { - //buffer = Buffer.concat([buffer,chunk]); - var msg = {topic:node.topic, payload:chunk, fromip:socket.remoteAddress+':'+socket.remotePort}; - node.send(msg); - //} - }); - socket.on('end', function() { - var msg = {topic:node.topic, payload:buffer, fromip:socket.remoteAddress+':'+socket.remotePort}; - node.send(msg); - }); + var buffer = (node.datatype == 'buffer')? new Buffer(0):""; + socket.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + + if (node.stream) { + if ((typeof data) === "string" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i 0)) { + var msg = {topic:node.topic,payload:buffer}; + node.send(msg); + buffer = null; + } + }); + socket.on('error',function(err) { + node.log(err); + }); }); server.listen(node.port); - node.log('socket input on port '+node.port); - + node.log('listening on port '+node.port); + this._close = function() { + this.closing = true; server.close(); - node.log('socket input stopped'); + node.log('stopped listening on port '+node.port); } } - + } RED.nodes.registerType("tcp in",TcpIn); diff --git a/nodes/io/31-tcpout.html b/nodes/io/31-tcpout.html index d8febe216..f0457fb87 100644 --- a/nodes/io/31-tcpout.html +++ b/nodes/io/31-tcpout.html @@ -16,34 +16,37 @@ diff --git a/nodes/io/31-tcpout.js b/nodes/io/31-tcpout.js index c7bb30a47..1a626736b 100644 --- a/nodes/io/31-tcpout.js +++ b/nodes/io/31-tcpout.js @@ -15,7 +15,7 @@ **/ var RED = require("../../red/red"); -var reConnect = RED.settings.socketReconnectTime||10000; +var reconnectTime = RED.settings.socketReconnectTime||10000; var net = require('net'); function TcpOut(n) { @@ -25,67 +25,93 @@ function TcpOut(n) { this.base64 = n.base64; this.beserver = n.beserver; this.name = n.name; + this.closing = false; var node = this; - if (!node.beserver) { - var client = new net.Socket(); - var to; - + if (!node.beserver||node.beserver=="client") { + var reconnectTimeout; + var client = null; + var connected = false; + function setupTcpClient() { - client.connect(node.port, node.host, function() { - node.log("output connected to "+node.host+":"+node.port); + node.log("connecting to "+node.host+":"+node.port); + client = net.connect(node.port, node.host, function() { + connected = true; + node.log("connected to "+node.host+":"+node.port); }); - + client.on('error', function (err) { - node.error('error : '+err); - to = setTimeout(setupTcpClient, reConnect); + node.log('error : '+err); }); - + client.on('end', function (err) { - node.log("output disconnected"); - to = setTimeout(setupTcpClient, reConnect); }); - + client.on('close', function() { - client.destroy(); - node.log('closed'); - to = setTimeout(setupTcpClient, reConnect); - }); - - node.on("input", function(msg) { - if (msg.payload != null) { - if (node.base64) { client.write(new Buffer(msg.payload,'base64')); } - else { client.write(msg.payload);} - } + node.log("connection lost to "+node.host+":"+node.port); + connected = false; + client.destroy(); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + } }); } setupTcpClient(); - + + + node.on("input", function(msg) { + if (connected && msg.payload != null) { + if (Buffer.isBuffer(msg.payload)) { + client.write(msg.payload); + } else if (typeof msg.payload === "string" && node.base64) { + client.write(new Buffer(msg.payload,'base64')); + } else { + client.write(new Buffer(""+msg.payload)); + } + } + }); + + this._close = function() { + this.closing = true; client.end(); - clearTimeout(to); - node.log('output stopped'); + clearTimeout(reconnectTimeout); } } else { + var connectedSockets = []; var server = net.createServer(function (socket) { - socket.on("connect",function() { - node.log("Connection from "+socket.remoteAddress); - }); - node.on("input", function(msg) { - if (msg.payload != null) { - if (node.base64) { socket.write(new Buffer(msg.payload,'base64')); } - else { socket.write(msg.payload);} - } - }); + var remoteDetails = socket.remoteAddress+":"+socket.remotePort; + node.log("connection from "+remoteDetails); + connectedSockets.push(socket); + socket.on('close',function() { + node.log("connection closed from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + }); }); + node.on("input", function(msg) { + if (msg.payload != null) { + var buffer; + if (Buffer.isBuffer(msg.payload)) { + buffer = msg.payload; + } else if (typeof msg.payload === "string" && node.base64) { + buffer = new Buffer(msg.payload,'base64'); + } else { + buffer = new Buffer(""+msg.payload); + } + for (var i = 0; i