Allow TCP node option to break connections per message (and auto

reconnect) - eg for file trnasfer to indicate EOF.

Change to FA icons.
This commit is contained in:
Dave C-J 2014-09-03 20:06:29 +01:00
parent e136080888
commit fc679adefb
2 changed files with 172 additions and 93 deletions

View File

@ -16,7 +16,7 @@
<script type="text/x-red" data-template-name="tcp in"> <script type="text/x-red" data-template-name="tcp in">
<div class="form-row"> <div class="form-row">
<label for="node-input-server"><i class="icon-resize-small"></i> Type</label> <label for="node-input-server"><i class="fa fa-dot-circle-o"></i> Type</label>
<select id="node-input-server" style="width:120px; margin-right:5px;"> <select id="node-input-server" style="width:120px; margin-right:5px;">
<option value="server">Listen on</option> <option value="server">Listen on</option>
<option value="client">Connect to</option> <option value="client">Connect to</option>
@ -28,7 +28,7 @@
</div> </div>
<div class="form-row"> <div class="form-row">
<label><i class="icon-th"></i> Output</label> <label><i class="fa fa-th"></i> Output</label>
a a
<select id="node-input-datamode" style="width:110px;"> <select id="node-input-datamode" style="width:110px;">
<option value="stream">stream of</option> <option value="stream">stream of</option>
@ -47,11 +47,11 @@
</div> </div>
<div class="form-row"> <div class="form-row">
<label for="node-input-topic"><i class="icon-tasks"></i> Topic</label> <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label>
<input type="text" id="node-input-topic" placeholder="Topic"> <input type="text" id="node-input-topic" placeholder="Topic">
</div> </div>
<div class="form-row"> <div class="form-row">
<label for="node-input-name"><i class="icon-tag"></i> Name</label> <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name"> <input type="text" id="node-input-name" placeholder="Name">
</div> </div>
</script> </script>
@ -118,7 +118,7 @@
<script type="text/x-red" data-template-name="tcp out"> <script type="text/x-red" data-template-name="tcp out">
<div class="form-row"> <div class="form-row">
<label for="node-input-beserver"><i class="icon-resize-small"></i> Type</label> <label for="node-input-beserver"><i class="fa fa-dot-circle-o"></i> Type</label>
<select id="node-input-beserver" style="width:150px; margin-right:5px;"> <select id="node-input-beserver" style="width:150px; margin-right:5px;">
<option value="server">Listen on</option> <option value="server">Listen on</option>
<option value="client">Connect to</option> <option value="client">Connect to</option>
@ -128,7 +128,13 @@
</div> </div>
<div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;"> <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;">
at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 40%;"> at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 60%;">
</div>
<div class="form-row hidden" id="node-input-end-row">
<label>&nbsp;</label>
<input type="checkbox" id="node-input-end" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-input-end" style="width: 70%;">Close connection after each message is sent ?</label>
</div> </div>
<div class="form-row"> <div class="form-row">
@ -138,16 +144,23 @@
</div> </div>
<div class="form-row"> <div class="form-row">
<label for="node-input-name"><i class="icon-tag"></i> Name</label> <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name"> <input type="text" id="node-input-name" placeholder="Name">
</div> </div>
<div class="form-tips hidden" id="fin-tip">
<b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example.
</div>
<div class="form-tips hidden" id="fin-tip2">
<b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example. The receiving client will need to reconnect.
</div>
</script> </script>
<script type="text/x-red" data-help-name="tcp out"> <script type="text/x-red" data-help-name="tcp out">
<p>Provides a choice of tcp outputs. Can either connect to a remote tcp port, <p>Provides a choice of tcp outputs. Can either connect to a remote tcp port,
accept incoming connections, or reply to messages received from a TCP In node.</p> accept incoming connections, or reply to messages received from a TCP In node.</p>
<p>Only <b>msg.payload</b> is sent.</p> <p>Only <b>msg.payload</b> is sent.</p>
<p>If <b>msg.payload</b> is a string containing a base64 encoding of binary <p>If <b>msg.payload</b> is a string containing a Base64 encoding of binary
data, the Base64 decoding option will cause it to be converted back to binary data, the Base64 decoding option will cause it to be converted back to binary
before being sent.</p> before being sent.</p>
</script> </script>
@ -161,6 +174,7 @@
port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } }, port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } },
beserver: {value:"client",required:true}, beserver: {value:"client",required:true},
base64: {value:false,required:true}, base64: {value:false,required:true},
end: {value:false,required:true},
name: {value:""} name: {value:""}
}, },
inputs:1, inputs:1,
@ -179,15 +193,27 @@
if (sockettype == "reply") { if (sockettype == "reply") {
$("#node-input-port-row").hide(); $("#node-input-port-row").hide();
$("#node-input-host-row").hide(); $("#node-input-host-row").hide();
$("#node-input-end-row").hide();
} else { } else {
$("#node-input-port-row").show(); $("#node-input-port-row").show();
$("#node-input-end-row").show();
} }
if (sockettype == "client") { if (sockettype == "client") {
$("#node-input-host-row").show(); $("#node-input-host-row").show();
$("#fin-tip").show();
} else { } else {
$("#node-input-host-row").hide(); $("#node-input-host-row").hide();
$("#fin-tip").hide();
} }
if (sockettype == "server") {
$("#fin-tip2").show();
}
else {
$("#fin-tip2").hide();
}
}; };
updateOptions(); updateOptions();
$("#node-input-beserver").change(updateOptions); $("#node-input-beserver").change(updateOptions);
@ -208,12 +234,13 @@
<option value="time">after a fixed timeout of</option> <option value="time">after a fixed timeout of</option>
<option value="char">when character received is</option> <option value="char">when character received is</option>
<option value="count">a fixed number of characters</option> <option value="count">a fixed number of characters</option>
<option value="sit">never. Keep connection open</option>
</select> </select>
<input type="text" id="node-input-splitc" style="width:50px;"> <input type="text" id="node-input-splitc" style="width:50px;">
<span id="node-units"></span> <span id="node-units"></span>
</div> </div>
<div class="form-row"> <div class="form-row">
<label for="node-input-name"><i class="icon-tag"></i> Name</label> <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name" placeholder="Name"> <input type="text" id="node-input-name" placeholder="Name">
</div> </div>
<div class="form-tips"><b>Tip:</b> outputs a binary <b>Buffer</b>, so you may want to .toString() it.</div> <div class="form-tips"><b>Tip:</b> outputs a binary <b>Buffer</b>, so you may want to .toString() it.</div>
@ -229,18 +256,22 @@
if (previous != "time") $("#node-input-splitc").val("0"); if (previous != "time") $("#node-input-splitc").val("0");
$("#node-units").text("ms"); $("#node-units").text("ms");
} }
else { else if ($("#node-input-out").val() == "count") {
if (previous != "count") $("#node-input-splitc").val("12"); if (previous != "count") $("#node-input-splitc").val("12");
$("#node-units").text("chars"); $("#node-units").text("chars");
} }
else {
if (previous != "sit") $("#node-input-splitc").val("0");
$("#node-units").text("");
}
}); });
</script> </script>
<script type="text/x-red" data-help-name="tcp request"> <script type="text/x-red" data-help-name="tcp request">
<p>A simple TCP request node - sends the <b>msg.payload</b> to a server tcp port and expects a response.</p> <p>A simple TCP request node - sends the <b>msg.payload</b> to a server tcp port and expects a response.</p>
<p>Connects, sends the "request", reads the "response" and disconnects. It can either count a number of <p>Connects, sends the "request", reads the "response". It can either count a number of
returned characters into a fixed buffer, match a specified character before returning, returned characters into a fixed buffer, match a specified character before returning,
or wait a fixed timeout from first reply and then return.</p> wait a fixed timeout from first reply and then return, or just sit and wait for data.</p>
<p>The response will be output in <b>msg.payload</b> as a buffer, so you may want to .toString() it.</p> <p>The response will be output in <b>msg.payload</b> as a buffer, so you may want to .toString() it.</p>
</script> </script>

View File

@ -34,11 +34,13 @@ module.exports = function(RED) {
this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
this.closing = false; this.closing = false;
var node = this; var node = this;
var count = 0;
if (!node.server) { if (!node.server) {
var buffer = null; var buffer = null;
var client; var client;
var reconnectTimeout; var reconnectTimeout;
var end = false;
var setupTcpClient = function() { var setupTcpClient = function() {
node.log("connecting to "+node.host+":"+node.port); node.log("connecting to "+node.host+":"+node.port);
node.status({fill:"grey",shape:"dot",text:"connecting"}); node.status({fill:"grey",shape:"dot",text:"connecting"});
@ -81,16 +83,25 @@ module.exports = function(RED) {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer}; var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id}; msg._session = {type:"tcp",id:id};
node.send(msg); if (buffer.length !== 0) {
end = true; // only ask for fast re-connect if we actually got something
node.send(msg);
}
buffer = null; buffer = null;
} }
}); });
client.on('close', function() { client.on('close', function() {
delete connectionPool[id]; delete connectionPool[id];
node.log("connection lost to "+node.host+":"+node.port);
node.status({fill:"red",shape:"ring",text:"disconnected"}); node.status({fill:"red",shape:"ring",text:"disconnected"});
if (!node.closing) { if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); if (end) { // if we were asked to close then try to reconnect once very quick.
end = false;
reconnectTimeout = setTimeout(setupTcpClient, 20);
}
else {
node.log("connection lost to "+node.host+":"+node.port);
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
}
} }
}); });
client.on('error', function(err) { client.on('error', function(err) {
@ -109,13 +120,13 @@ module.exports = function(RED) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var id = (1+Math.random()*4294967295).toString(16); var id = (1+Math.random()*4294967295).toString(16);
connectionPool[id] = socket; connectionPool[id] = socket;
node.status({text:++count+" connections"});
var buffer = (node.datatype == 'buffer')? new Buffer(0):""; var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
socket.on('data', function (data) { socket.on('data', function (data) {
if (node.datatype != 'buffer') { if (node.datatype != 'buffer') {
data = data.toString(node.datatype); data = data.toString(node.datatype);
} }
if (node.stream) { if (node.stream) {
if ((typeof data) === "string" && node.newline != "") { if ((typeof data) === "string" && node.newline != "") {
buffer = buffer+data; buffer = buffer+data;
@ -140,10 +151,12 @@ module.exports = function(RED) {
} }
}); });
socket.on('end', function() { socket.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) {
var msg = {topic:node.topic,payload:buffer}; if (buffer.length > 0) {
msg._session = {type:"tcp",id:id}; var msg = {topic:node.topic,payload:buffer};
node.send(msg); msg._session = {type:"tcp",id:id};
node.send(msg);
}
buffer = null; buffer = null;
} }
}); });
@ -153,6 +166,7 @@ module.exports = function(RED) {
}); });
socket.on('close', function() { socket.on('close', function() {
delete connectionPool[id]; delete connectionPool[id];
node.status({text:--count+" connections"});
}); });
socket.on('error',function(err) { socket.on('error',function(err) {
node.log(err); node.log(err);
@ -186,6 +200,7 @@ module.exports = function(RED) {
this.host = n.host; this.host = n.host;
this.port = n.port * 1; this.port = n.port * 1;
this.base64 = n.base64; this.base64 = n.base64;
this.doend = n.end || false;
this.beserver = n.beserver; this.beserver = n.beserver;
this.name = n.name; this.name = n.name;
this.closing = false; this.closing = false;
@ -195,6 +210,7 @@ module.exports = function(RED) {
var reconnectTimeout; var reconnectTimeout;
var client = null; var client = null;
var connected = false; var connected = false;
var end = false;
var setupTcpClient = function() { var setupTcpClient = function() {
node.log("connecting to "+node.host+":"+node.port); node.log("connecting to "+node.host+":"+node.port);
@ -210,12 +226,18 @@ module.exports = function(RED) {
client.on('end', function (err) { client.on('end', function (err) {
}); });
client.on('close', function() { client.on('close', function() {
node.log("connection lost to "+node.host+":"+node.port);
node.status({fill:"red",shape:"ring",text:"disconnected"}); node.status({fill:"red",shape:"ring",text:"disconnected"});
connected = false; connected = false;
client.destroy(); client.destroy();
if (!node.closing) { if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); if (end) {
end = false;
reconnectTimeout = setTimeout(setupTcpClient,20);
}
else {
node.log("connection lost to "+node.host+":"+node.port);
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
}
} }
}); });
} }
@ -230,6 +252,10 @@ module.exports = function(RED) {
} else { } else {
client.write(new Buffer(""+msg.payload)); client.write(new Buffer(""+msg.payload));
} }
if (node.doend === true) {
end = true;
client.end();
}
} }
}); });
@ -256,11 +282,13 @@ module.exports = function(RED) {
}); });
} else { } else {
var connectedSockets = []; var connectedSockets = [];
node.status({text:"0 connections"});
var server = net.createServer(function (socket) { var server = net.createServer(function (socket) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var remoteDetails = socket.remoteAddress+":"+socket.remotePort; var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
node.log("connection from "+remoteDetails); node.log("connection from "+remoteDetails);
connectedSockets.push(socket); connectedSockets.push(socket);
node.status({text:connectedSockets.length+" connections"});
socket.on('timeout', function() { socket.on('timeout', function() {
node.log('timeout closed socket port '+node.port); node.log('timeout closed socket port '+node.port);
socket.end(); socket.end();
@ -268,12 +296,15 @@ module.exports = function(RED) {
socket.on('close',function() { socket.on('close',function() {
node.log("connection closed from "+remoteDetails); node.log("connection closed from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1); connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.status({text:connectedSockets.length+" connections"});
}); });
socket.on('error',function() { socket.on('error',function() {
node.log("socket error from "+remoteDetails); node.log("socket error from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1); connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.status({text:connectedSockets.length+" connections"});
}); });
}); });
node.on("input", function(msg) { node.on("input", function(msg) {
if (msg.payload != null) { if (msg.payload != null) {
var buffer; var buffer;
@ -285,7 +316,8 @@ module.exports = function(RED) {
buffer = new Buffer(""+msg.payload); buffer = new Buffer(""+msg.payload);
} }
for (var i = 0; i<connectedSockets.length;i+=1) { for (var i = 0; i<connectedSockets.length;i+=1) {
connectedSockets[i].write(buffer); if (node.doend === true) { connectedSockets[i].end(buffer); }
else { connectedSockets[i].write(buffer); }
} }
} }
}); });
@ -325,94 +357,110 @@ module.exports = function(RED) {
if (this.out == "count") { buf = new Buffer(this.splitc); } if (this.out == "count") { buf = new Buffer(this.splitc); }
else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully
this.connected = false;
var node = this; var node = this;
var client; var client;
this.on("input", function(msg) { this.on("input", function(msg) {
var i = 0; var i = 0;
if (!(msg.payload instanceof Buffer)) { if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
msg.payload = msg.payload.toString(); msg.payload = msg.payload.toString();
} }
client = net.Socket(); if (!node.connected) {
client.setTimeout(socketTimeout); client = net.Socket();
client.connect(node.port, node.server, function() { client.setTimeout(socketTimeout);
//node.log('client connected'); node.status({});
client.write(msg.payload); client.connect(node.port, node.server, function() {
}); //node.log('client connected');
client.on('data', function(data) { node.status({fill:"green",shape:"dot",text:"connected"});
//node.log("data", data.length, data); node.connected = true;
client.write(msg.payload);
});
if (node.splitc === 0) { client.on('data', function(data) {
node.send({"payload": data}); //node.log("data:"+ data.length+":"+ data);
} if (node.splitc === 0) {
else { node.send({"payload": data});
for (var j = 0; j < data.length; j++ ) { }
if (node.out == "time") { else if (node.out === "sit") { // if we are staying connected just send the buffer
// do the timer thing node.send({"payload": data});
if (node.tout) { }
i += 1; else {
buf[i] = data[j]; for (var j = 0; j < data.length; j++ ) {
if (node.out === "time") {
// do the timer thing
if (node.tout) {
i += 1;
buf[i] = data[j];
}
else {
node.tout = setTimeout(function () {
node.tout = null;
var m = new Buffer(i+1);
buf.copy(m,0,0,i+1);
node.send({"payload": m});
client.end();
m = null;
}, node.splitc);
i = 0;
buf[0] = data[j];
}
} }
// count bytes into a buffer...
else if (node.out == "count") {
buf[i] = data[j];
i += 1;
if ( i >= node.serialConfig.count) {
node.send({"payload": buf});
client.end();
i = 0;
}
}
// look for a char
else { else {
node.tout = setTimeout(function () { buf[i] = data[j];
node.tout = null; i += 1;
var m = new Buffer(i+1); if (data[j] == node.splitc) {
buf.copy(m,0,0,i+1); var m = new Buffer(i);
buf.copy(m,0,0,i);
node.send({"payload": m}); node.send({"payload": m});
client.end(); client.end();
m = null; m = null;
}, node.splitc); i = 0;
i = 0; }
buf[0] = data[j];
}
}
// count bytes into a buffer...
else if (node.out == "count") {
buf[i] = data[j];
i += 1;
if ( i >= node.serialConfig.count) {
node.send({"payload": buf});
client.end();
i = 0;
}
}
// look for a char
else {
buf[i] = data[j];
i += 1;
if (data[j] == node.splitc) {
var m = new Buffer(i);
buf.copy(m,0,0,i);
node.send({"payload": m});
client.end();
m = null;
i = 0;
} }
} }
} }
} });
}); client.on('end', function() {
client.on('end', function() { //node.log('client disconnected');
//node.log('client disconnected'); node.connected = false;
}); node.status({});
client.on('error', function() { client = null;
node.log('connect failed'); });
if (client) { client.end(); }
});
client.on('timeout',function() {
node.log('connect timeout');
if (client) {
client.end();
setTimeout(function() {
client.connect(node.port, node.server, function() {
//node.log('client connected');
client.write(msg.payload);
});
},reconnectTime);
}
}); client.on('error', function() {
node.log('connect failed');
node.status({fill:"red",shape:"ring",text:"error"});
if (client) { client.end(); }
});
client.on('timeout',function() {
node.log('connect timeout');
if (client) {
client.end();
setTimeout(function() {
client.connect(node.port, node.server, function() {
//node.log('client connected');
node.connected = true;
client.write(msg.payload);
});
},reconnectTime);
}
});
}
else { client.write(msg.payload); }
}); });
this.on("close", function() { this.on("close", function() {