Add err handler on tcpout/listener. Fixes #50

This commit is contained in:
Nicholas O'Leary 2013-10-27 17:57:46 +00:00
parent 6fb8506722
commit 488a039781
1 changed files with 93 additions and 94 deletions

View File

@ -19,106 +19,105 @@ var reconnectTime = RED.settings.socketReconnectTime||10000;
var net = require('net'); var net = require('net');
function TcpOut(n) { function TcpOut(n) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
this.host = n.host; this.host = n.host;
this.port = n.port * 1; this.port = n.port * 1;
this.base64 = n.base64; this.base64 = n.base64;
this.beserver = n.beserver; this.beserver = n.beserver;
this.name = n.name; this.name = n.name;
this.closing = false; this.closing = false;
var node = this; var node = this;
if (!node.beserver||node.beserver=="client") { if (!node.beserver||node.beserver=="client") {
var reconnectTimeout; var reconnectTimeout;
var client = null; var client = null;
var connected = false; var connected = false;
function setupTcpClient() { function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port); node.log("connecting to "+node.host+":"+node.port);
client = net.connect(node.port, node.host, function() { client = net.connect(node.port, node.host, function() {
connected = true; connected = true;
node.log("connected to "+node.host+":"+node.port); node.log("connected to "+node.host+":"+node.port);
}); });
client.on('error', function (err) { client.on('error', function (err) {
node.log('error : '+err); node.log('error : '+err);
}); });
client.on('end', function (err) { client.on('end', function (err) {
}); });
client.on('close', function() { client.on('close', function() {
node.log("connection lost to "+node.host+":"+node.port); node.log("connection lost to "+node.host+":"+node.port);
connected = false; connected = false;
client.destroy(); client.destroy();
if (!node.closing) { if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
} }
}); });
} }
setupTcpClient(); setupTcpClient();
node.on("input", function(msg) { node.on("input", function(msg) {
if (connected && msg.payload != null) { if (connected && msg.payload != null) {
if (Buffer.isBuffer(msg.payload)) { if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload); client.write(msg.payload);
} else if (typeof msg.payload === "string" && node.base64) { } else if (typeof msg.payload === "string" && node.base64) {
client.write(new Buffer(msg.payload,'base64')); client.write(new Buffer(msg.payload,'base64'));
} else { } else {
client.write(new Buffer(""+msg.payload)); client.write(new Buffer(""+msg.payload));
} }
} }
}); });
node.on("close", function() {
this._close = function() { this.closing = true;
this.closing = true; client.end();
client.end(); clearTimeout(reconnectTimeout);
clearTimeout(reconnectTimeout); });
}
} } else {
var connectedSockets = [];
else { var server = net.createServer(function (socket) {
var connectedSockets = []; var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
var server = net.createServer(function (socket) { node.log("connection from "+remoteDetails);
var remoteDetails = socket.remoteAddress+":"+socket.remotePort; connectedSockets.push(socket);
node.log("connection from "+remoteDetails); socket.on('close',function() {
connectedSockets.push(socket); node.log("connection closed from "+remoteDetails);
socket.on('close',function() { connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.log("connection closed from "+remoteDetails); });
socket.on('error',function() {
node.log("socket error from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1); connectedSockets.splice(connectedSockets.indexOf(socket),1);
}); });
});
node.on("input", function(msg) {
if (msg.payload != null) {
var buffer;
if (Buffer.isBuffer(msg.payload)) {
buffer = msg.payload;
} else if (typeof msg.payload === "string" && node.base64) {
buffer = new Buffer(msg.payload,'base64');
} else {
buffer = new Buffer(""+msg.payload);
}
for (var i = 0; i<connectedSockets.length;i+=1) {
connectedSockets[i].write(buffer);
}
}
});
server.listen(node.port); });
node.log('listening on port '+node.port); node.on("input", function(msg) {
if (msg.payload != null) {
var buffer;
if (Buffer.isBuffer(msg.payload)) {
buffer = msg.payload;
} else if (typeof msg.payload === "string" && node.base64) {
buffer = new Buffer(msg.payload,'base64');
} else {
buffer = new Buffer(""+msg.payload);
}
for (var i = 0; i<connectedSockets.length;i+=1) {
connectedSockets[i].write(buffer);
}
}
});
this._close = function() { server.listen(node.port);
server.close(); node.log('listening on port '+node.port);
node.log('stopped listening on port '+node.port);
} node.on('close', function() {
} server.close();
node.log('stopped listening on port '+node.port);
});
}
} }
RED.nodes.registerType("tcp out",TcpOut); RED.nodes.registerType("tcp out",TcpOut);
TcpOut.prototype.close = function() {
this._close();
}