diff --git a/nodes/io/31-tcpin.html b/nodes/io/31-tcpin.html index 4257d4a50..48c3f983f 100644 --- a/nodes/io/31-tcpin.html +++ b/nodes/io/31-tcpin.html @@ -114,3 +114,74 @@ } }); + + + + + + diff --git a/nodes/io/31-tcpin.js b/nodes/io/31-tcpin.js index 8d52df7a9..80d6000c8 100644 --- a/nodes/io/31-tcpin.js +++ b/nodes/io/31-tcpin.js @@ -19,134 +19,235 @@ var reconnectTime = RED.settings.socketReconnectTime||10000; var net = require('net'); function TcpIn(n) { - RED.nodes.createNode(this,n); - this.host = n.host; - this.port = n.port * 1; - this.topic = n.topic; - this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/ - this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */ - this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r"); - this.base64 = n.base64; - this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); - this.closing = false; - var node = this; + RED.nodes.createNode(this,n); + this.host = n.host; + this.port = n.port * 1; + this.topic = n.topic; + this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/ + this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */ + this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r"); + this.base64 = n.base64; + this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); + this.closing = false; + var node = this; - if (!node.server) { + if (!node.server) { var buffer = null; - var client; - var reconnectTimeout; - function setupTcpClient() { - node.log("connecting to "+node.host+":"+node.port); - client = net.connect(node.port, node.host, function() { - buffer = (node.datatype == 'buffer')? new Buffer(0):""; - node.log("connected to "+node.host+":"+node.port); - }); + var client; + var reconnectTimeout; + function setupTcpClient() { + node.log("connecting to "+node.host+":"+node.port); + client = net.connect(node.port, node.host, function() { + buffer = (node.datatype == 'buffer')? new Buffer(0):""; + node.log("connected to "+node.host+":"+node.port); + }); - client.on('data', function (data) { - if (node.datatype != 'buffer') { - data = data.toString(node.datatype); - } - if (node.stream) { - if ((node.datatype) === "utf8" && node.newline != "") { - buffer = buffer+data; - var parts = buffer.split(node.newline); - for (var i = 0;i 0)) { - var msg = {topic:node.topic,payload:buffer}; - node.send(msg); - buffer = null; - } - }); + client.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + if (node.stream) { + if ((node.datatype) === "utf8" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i 0)) { + var msg = {topic:node.topic,payload:buffer}; + node.send(msg); + buffer = null; + } + }); - client.on('close', function() { - node.log("connection lost to "+node.host+":"+node.port); - if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); - } - }); + client.on('close', function() { + node.log("connection lost to "+node.host+":"+node.port); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + } + }); - client.on('error', function(err) { - node.log(err); - }); - } - setupTcpClient(); + client.on('error', function(err) { + node.log(err); + }); + } + setupTcpClient(); - this._close = function() { - this.closing = true; - client.end(); - clearTimeout(reconnectTimeout); - } - } else { - var server = net.createServer(function (socket) { - var buffer = (node.datatype == 'buffer')? new Buffer(0):""; - socket.on('data', function (data) { - if (node.datatype != 'buffer') { - data = data.toString(node.datatype); - } + this.on('close', function() { + this.closing = true; + client.end(); + clearTimeout(reconnectTimeout); + }); + } else { + var server = net.createServer(function (socket) { + var buffer = (node.datatype == 'buffer')? new Buffer(0):""; + socket.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } - if (node.stream) { - if ((typeof data) === "string" && node.newline != "") { - buffer = buffer+data; - var parts = buffer.split(node.newline); - for (var i = 0;i 0)) { - var msg = {topic:node.topic,payload:buffer}; - node.send(msg); - buffer = null; - } - }); - socket.on('error',function(err) { - node.log(err); - }); - }); - server.listen(node.port); - node.log('listening on port '+node.port); + if (node.stream) { + if ((typeof data) === "string" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i 0)) { + var msg = {topic:node.topic,payload:buffer}; + node.send(msg); + buffer = null; + } + }); + socket.on('error',function(err) { + node.log(err); + }); + }); + server.listen(node.port); + node.log('listening on port '+node.port); - this._close = function() { - this.closing = true; - server.close(); - node.log('stopped listening on port '+node.port); - } - } + this.on('close', function() { + this.closing = true; + server.close(); + node.log('stopped listening on port '+node.port); + }); + } } RED.nodes.registerType("tcp in",TcpIn); -TcpIn.prototype.close = function() { - this._close(); +function TcpOut(n) { + RED.nodes.createNode(this,n); + this.host = n.host; + this.port = n.port * 1; + this.base64 = n.base64; + this.beserver = n.beserver; + this.name = n.name; + this.closing = false; + var node = this; + + if (!node.beserver||node.beserver=="client") { + var reconnectTimeout; + var client = null; + var connected = false; + + function setupTcpClient() { + node.log("connecting to "+node.host+":"+node.port); + client = net.connect(node.port, node.host, function() { + connected = true; + node.log("connected to "+node.host+":"+node.port); + }); + + client.on('error', function (err) { + node.log('error : '+err); + }); + + client.on('end', function (err) { + }); + + client.on('close', function() { + node.log("connection lost to "+node.host+":"+node.port); + connected = false; + client.destroy(); + if (!node.closing) { + reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + } + }); + } + setupTcpClient(); + + + node.on("input", function(msg) { + if (connected && msg.payload != null) { + if (Buffer.isBuffer(msg.payload)) { + client.write(msg.payload); + } else if (typeof msg.payload === "string" && node.base64) { + client.write(new Buffer(msg.payload,'base64')); + } else { + client.write(new Buffer(""+msg.payload)); + } + } + }); + + node.on("close", function() { + this.closing = true; + client.end(); + clearTimeout(reconnectTimeout); + }); + + } else { + var connectedSockets = []; + var server = net.createServer(function (socket) { + var remoteDetails = socket.remoteAddress+":"+socket.remotePort; + node.log("connection from "+remoteDetails); + connectedSockets.push(socket); + socket.on('close',function() { + node.log("connection closed from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + }); + socket.on('error',function() { + node.log("socket error from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + }); + + }); + node.on("input", function(msg) { + if (msg.payload != null) { + var buffer; + if (Buffer.isBuffer(msg.payload)) { + buffer = msg.payload; + } else if (typeof msg.payload === "string" && node.base64) { + buffer = new Buffer(msg.payload,'base64'); + } else { + buffer = new Buffer(""+msg.payload); + } + for (var i = 0; i - - - - - - diff --git a/nodes/io/31-tcpout.js b/nodes/io/31-tcpout.js deleted file mode 100644 index a157c9f39..000000000 --- a/nodes/io/31-tcpout.js +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Copyright 2013 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -var RED = require("../../red/red"); -var reconnectTime = RED.settings.socketReconnectTime||10000; -var net = require('net'); - -function TcpOut(n) { - RED.nodes.createNode(this,n); - this.host = n.host; - this.port = n.port * 1; - this.base64 = n.base64; - this.beserver = n.beserver; - this.name = n.name; - this.closing = false; - var node = this; - - if (!node.beserver||node.beserver=="client") { - var reconnectTimeout; - var client = null; - var connected = false; - - function setupTcpClient() { - node.log("connecting to "+node.host+":"+node.port); - client = net.connect(node.port, node.host, function() { - connected = true; - node.log("connected to "+node.host+":"+node.port); - }); - - client.on('error', function (err) { - node.log('error : '+err); - }); - - client.on('end', function (err) { - }); - - client.on('close', function() { - node.log("connection lost to "+node.host+":"+node.port); - connected = false; - client.destroy(); - if (!node.closing) { - reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); - } - }); - } - setupTcpClient(); - - - node.on("input", function(msg) { - if (connected && msg.payload != null) { - if (Buffer.isBuffer(msg.payload)) { - client.write(msg.payload); - } else if (typeof msg.payload === "string" && node.base64) { - client.write(new Buffer(msg.payload,'base64')); - } else { - client.write(new Buffer(""+msg.payload)); - } - } - }); - - node.on("close", function() { - this.closing = true; - client.end(); - clearTimeout(reconnectTimeout); - }); - - } else { - var connectedSockets = []; - var server = net.createServer(function (socket) { - var remoteDetails = socket.remoteAddress+":"+socket.remotePort; - node.log("connection from "+remoteDetails); - connectedSockets.push(socket); - socket.on('close',function() { - node.log("connection closed from "+remoteDetails); - connectedSockets.splice(connectedSockets.indexOf(socket),1); - }); - socket.on('error',function() { - node.log("socket error from "+remoteDetails); - connectedSockets.splice(connectedSockets.indexOf(socket),1); - }); - - }); - node.on("input", function(msg) { - if (msg.payload != null) { - var buffer; - if (Buffer.isBuffer(msg.payload)) { - buffer = msg.payload; - } else if (typeof msg.payload === "string" && node.base64) { - buffer = new Buffer(msg.payload,'base64'); - } else { - buffer = new Buffer(""+msg.payload); - } - for (var i = 0; i