1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Fix tcp node miscounting bytes, and check if staying connected first.

This commit is contained in:
dceejay 2015-05-26 20:08:58 +01:00
parent 98c9e40349
commit 4f174308b9
2 changed files with 23 additions and 23 deletions

View File

@ -66,6 +66,7 @@
category: 'input', category: 'input',
color:"Silver", color:"Silver",
defaults: { defaults: {
name: {value:""},
server: {value:"server",required:true}, server: {value:"server",required:true},
host: {value:"",validate:function(v) { return (this.server == "server")||v.length > 0;} }, host: {value:"",validate:function(v) { return (this.server == "server")||v.length > 0;} },
port: {value:"",required:true,validate:RED.validators.number()}, port: {value:"",required:true,validate:RED.validators.number()},
@ -73,7 +74,6 @@
datatype:{value:"buffer"}, datatype:{value:"buffer"},
newline:{value:""}, newline:{value:""},
topic: {value:""}, topic: {value:""},
name: {value:""},
base64: {/*deprecated*/ value:false,required:true} base64: {/*deprecated*/ value:false,required:true}
}, },
inputs:0, inputs:0,
@ -229,7 +229,7 @@
</div> </div>
<div class="form-row"> <div class="form-row">
<label for="node-input-out"><i class="fa fa-sign-out"></i> Return</label> <label for="node-input-out"><i class="fa fa-sign-out"></i> Return</label>
<select type="text" id="node-input-out" style="width:52%;"> <select type="text" id="node-input-out" style="width:56%;">
<option value="time">after a fixed timeout of</option> <option value="time">after a fixed timeout of</option>
<option value="char">when character received is</option> <option value="char">when character received is</option>
<option value="count">a fixed number of characters</option> <option value="count">a fixed number of characters</option>

View File

@ -1,5 +1,5 @@
/** /**
* Copyright 2013,2014 IBM Corp. * Copyright 2013,2015 IBM Corp.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -84,7 +84,7 @@ module.exports = function(RED) {
}); });
client.on('end', function() { client.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer}; var msg = {topic:node.topic, payload:buffer};
msg._session = {type:"tcp",id:id}; msg._session = {type:"tcp",id:id};
if (buffer.length !== 0) { if (buffer.length !== 0) {
end = true; // only ask for fast re-connect if we actually got something end = true; // only ask for fast re-connect if we actually got something
@ -141,13 +141,13 @@ module.exports = function(RED) {
buffer = buffer+data; buffer = buffer+data;
var parts = buffer.split(node.newline); var parts = buffer.split(node.newline);
for (var i = 0; i<parts.length-1; i+=1) { for (var i = 0; i<parts.length-1; i+=1) {
msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort,connected:true}; msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
msg._session = {type:"tcp",id:id}; msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
buffer = parts[parts.length-1]; buffer = parts[parts.length-1];
} else { } else {
msg = {topic:node.topic, payload:data, connected:true}; msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id}; msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
@ -162,7 +162,7 @@ module.exports = function(RED) {
socket.on('end', function() { socket.on('end', function() {
if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) { if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) {
if (buffer.length > 0) { if (buffer.length > 0) {
var msg = {topic:node.topic,payload:buffer}; var msg = {topic:node.topic, payload:buffer};
msg._session = {type:"tcp",id:id}; msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
@ -236,7 +236,7 @@ module.exports = function(RED) {
node.status({fill:"green",shape:"dot",text:"connected"}); node.status({fill:"green",shape:"dot",text:"connected"});
}); });
client.on('error', function (err) { client.on('error', function (err) {
node.log('error : '+err); node.log(err);
}); });
client.on('end', function (err) { client.on('end', function (err) {
}); });
@ -334,7 +334,7 @@ module.exports = function(RED) {
} else { } else {
buffer = new Buffer(""+msg.payload); buffer = new Buffer(""+msg.payload);
} }
for (var i = 0; i<connectedSockets.length;i+=1) { for (var i = 0; i < connectedSockets.length; i += 1) {
if (node.doend === true) { connectedSockets[i].end(buffer); } if (node.doend === true) { connectedSockets[i].end(buffer); }
else { connectedSockets[i].write(buffer); } else { connectedSockets[i].write(buffer); }
} }
@ -376,11 +376,11 @@ module.exports = function(RED) {
this.splitc = n.splitc; this.splitc = n.splitc;
if (this.out != "char") { this.splitc = Number(this.splitc); } if (this.out != "char") { this.splitc = Number(this.splitc); }
else { this.splitc.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); } // jshint ignore:line else { this.splitc = this.splitc.replace("\\n",0x0A).replace("\\r",0x0D).replace("\\t",0x09).replace("\\e",0x1B).replace("\\f",0x0C).replace("\\0",0x00); } // jshint ignore:line
var buf; var buf;
if (this.out == "count") { buf = new Buffer(this.splitc); } if (this.out == "count") { buf = new Buffer(this.splitc); }
else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully else { buf = new Buffer(65536); } // set it to 64k... hopefully big enough for most TCP packets.... but only hopefully
this.connected = false; this.connected = false;
var node = this; var node = this;
@ -394,9 +394,11 @@ module.exports = function(RED) {
if (!node.connected) { if (!node.connected) {
client = net.Socket(); client = net.Socket();
client.setTimeout(socketTimeout); client.setTimeout(socketTimeout);
node.status({}); //node.status({});
var host = node.server || msg.host; var host = node.server || msg.host;
var port = node.port || msg.port; var port = node.port || msg.port;
var m;
if (host && port) { if (host && port) {
client.connect(port, host, function() { client.connect(port, host, function() {
//node.log('client connected'); //node.log('client connected');
@ -411,10 +413,10 @@ module.exports = function(RED) {
client.on('data', function(data) { client.on('data', function(data) {
//node.log("data:"+ data.length+":"+ data); //node.log("data:"+ data.length+":"+ data);
if (node.splitc === 0) { if (node.out == "sit") { // if we are staying connected just send the buffer
node.send({"payload": data}); node.send({"payload": data});
} }
else if (node.out === "sit") { // if we are staying connected just send the buffer else if (node.splitc === 0) {
node.send({"payload": data}); node.send({"payload": data});
} }
else { else {
@ -428,11 +430,10 @@ module.exports = function(RED) {
else { else {
node.tout = setTimeout(function () { node.tout = setTimeout(function () {
node.tout = null; node.tout = null;
var m = new Buffer(i+1); m = new Buffer(i+1);
buf.copy(m,0,0,i+1); buf.copy(m,0,0,i+1);
node.send({"payload": m}); node.send({"payload":m});
if (client) { client.end(); } if (client) { client.end(); }
m = null;
}, node.splitc); }, node.splitc);
i = 0; i = 0;
buf[0] = data[j]; buf[0] = data[j];
@ -442,8 +443,9 @@ module.exports = function(RED) {
else if (node.out == "count") { else if (node.out == "count") {
buf[i] = data[j]; buf[i] = data[j];
i += 1; i += 1;
if ( i >= node.serialConfig.count) { if ( i >= node.splitc) {
node.send({"payload": buf}); m = new Buffer(i);
buf.copy(m,0,0,i);
if (client) { client.end(); } if (client) { client.end(); }
i = 0; i = 0;
} }
@ -453,11 +455,9 @@ module.exports = function(RED) {
buf[i] = data[j]; buf[i] = data[j];
i += 1; i += 1;
if (data[j] == node.splitc) { if (data[j] == node.splitc) {
var m = new Buffer(i); m = new Buffer(i);
buf.copy(m,0,0,i); buf.copy(m,0,0,i);
node.send({"payload": m});
if (client) { client.end(); } if (client) { client.end(); }
m = null;
i = 0; i = 0;
} }
} }
@ -466,9 +466,9 @@ module.exports = function(RED) {
}); });
client.on('end', function() { client.on('end', function() {
//node.log('client disconnected');
node.connected = false; node.connected = false;
node.status({}); node.status({});
node.send({"payload":m});
client = null; client = null;
}); });