1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Add session awareness to TCP nodes

Closes #63,#65
This commit is contained in:
Nick O'Leary 2013-12-19 21:16:25 +00:00
parent 02df584af6
commit 73f3ea52a5
2 changed files with 48 additions and 6 deletions

View File

@ -118,11 +118,12 @@
<script type="text/x-red" data-template-name="tcp out"> <script type="text/x-red" data-template-name="tcp out">
<div class="form-row"> <div class="form-row">
<label for="node-input-beserver"><i class="icon-resize-small"></i> Type</label> <label for="node-input-beserver"><i class="icon-resize-small"></i> Type</label>
<select id="node-input-beserver" style="width:120px; margin-right:5px;"> <select id="node-input-beserver" style="width:150px; margin-right:5px;">
<option value="server">Listen on</option> <option value="server">Listen on</option>
<option value="client">Connect to</option> <option value="client">Connect to</option>
<option value="reply">Reply to TCP</option>
</select> </select>
port <input type="text" id="node-input-port" style="width: 50px"> <span id="node-input-port-row">port <input type="text" id="node-input-port" style="width: 50px"></span>
</div> </div>
<div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;"> <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;">
@ -143,7 +144,7 @@
<script type="text/x-red" data-help-name="tcp out"> <script type="text/x-red" data-help-name="tcp out">
<p>Provides a choice of tcp outputs. Can either connect to a remote tcp port, <p>Provides a choice of tcp outputs. Can either connect to a remote tcp port,
or accept incoming connections.</p> accept incoming connections, or reply to messages received from a TCP In node.</p>
<p>Only <b>msg.payload</b> is sent.</p> <p>Only <b>msg.payload</b> is sent.</p>
<p>If <b>msg.payload</b> is a string containing a base64 encoding of binary <p>If <b>msg.payload</b> is a string containing a base64 encoding of binary
data, the Base64 decoding option will cause it to be converted back to binary data, the Base64 decoding option will cause it to be converted back to binary
@ -155,8 +156,8 @@
category: 'output', category: 'output',
color:"Silver", color:"Silver",
defaults: { defaults: {
host: {value:"",validate:function(v) { return (this.beserver == "server")||v.length > 0;} }, host: {value:"",validate:function(v) { return (this.beserver != "client")||v.length > 0;} },
port: {value:"",required:true}, port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } },
beserver: {value:"client",required:true}, beserver: {value:"client",required:true},
base64: {value:false,required:true}, base64: {value:false,required:true},
name: {value:""} name: {value:""}
@ -174,6 +175,13 @@
oneditprepare: function() { oneditprepare: function() {
var updateOptions = function() { var updateOptions = function() {
var sockettype = $("#node-input-beserver option:selected").val(); var sockettype = $("#node-input-beserver option:selected").val();
if (sockettype == "reply") {
$("#node-input-port-row").hide();
$("#node-input-host-row").hide();
} else {
$("#node-input-port-row").show();
}
if (sockettype == "client") { if (sockettype == "client") {
$("#node-input-host-row").show(); $("#node-input-host-row").show();
} else { } else {

View File

@ -18,6 +18,10 @@ var RED = require(process.env.NODE_RED_HOME+"/red/red");
var reconnectTime = RED.settings.socketReconnectTime||10000; var reconnectTime = RED.settings.socketReconnectTime||10000;
var net = require('net'); var net = require('net');
var connectionPool = {};
function TcpIn(n) { function TcpIn(n) {
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
this.host = n.host; this.host = n.host;
@ -37,10 +41,12 @@ function TcpIn(n) {
var reconnectTimeout; var reconnectTimeout;
function setupTcpClient() { function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port); node.log("connecting to "+node.host+":"+node.port);
var id = (1+Math.random()*4294967295).toString(16);
client = net.connect(node.port, node.host, function() { client = net.connect(node.port, node.host, function() {
buffer = (node.datatype == 'buffer')? new Buffer(0):""; buffer = (node.datatype == 'buffer')? new Buffer(0):"";
node.log("connected to "+node.host+":"+node.port); node.log("connected to "+node.host+":"+node.port);
}); });
connectionPool[id] = client;
client.on('data', function (data) { client.on('data', function (data) {
if (node.datatype != 'buffer') { if (node.datatype != 'buffer') {
@ -52,11 +58,13 @@ function TcpIn(n) {
var parts = buffer.split(node.newline); var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) { for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i]}; var msg = {topic:node.topic, payload:parts[i]};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
buffer = parts[parts.length-1]; buffer = parts[parts.length-1];
} else { } else {
var msg = {topic:node.topic, payload:data}; var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
} else { } else {
@ -70,12 +78,14 @@ function TcpIn(n) {
client.on('end', function() { client.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer}; var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
buffer = null; buffer = null;
} }
}); });
client.on('close', function() { client.on('close', function() {
delete connectionPool[id];
node.log("connection lost to "+node.host+":"+node.port); node.log("connection lost to "+node.host+":"+node.port);
if (!node.closing) { if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
@ -95,6 +105,9 @@ function TcpIn(n) {
}); });
} else { } else {
var server = net.createServer(function (socket) { var server = net.createServer(function (socket) {
var id = (1+Math.random()*4294967295).toString(16);
connectionPool[id] = socket;
var buffer = (node.datatype == 'buffer')? new Buffer(0):""; var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
socket.on('data', function (data) { socket.on('data', function (data) {
if (node.datatype != 'buffer') { if (node.datatype != 'buffer') {
@ -107,11 +120,13 @@ function TcpIn(n) {
var parts = buffer.split(node.newline); var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) { for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort}; var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
buffer = parts[parts.length-1]; buffer = parts[parts.length-1];
} else { } else {
var msg = {topic:node.topic, payload:data}; var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
} }
} else { } else {
@ -125,10 +140,14 @@ function TcpIn(n) {
socket.on('end', function() { socket.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer}; var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg); node.send(msg);
buffer = null; buffer = null;
} }
}); });
socket.on('close', function() {
delete connectionPool[id];
});
socket.on('error',function(err) { socket.on('error',function(err) {
node.log(err); node.log(err);
}); });
@ -206,6 +225,21 @@ function TcpOut(n) {
clearTimeout(reconnectTimeout); clearTimeout(reconnectTimeout);
}); });
} else if (node.beserver == "reply") {
node.on("input",function(msg) {
if (msg._session && msg._session.type == "tcp") {
var client = connectionPool[msg._session.id];
if (client) {
if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload);
} else if (typeof msg.payload === "string" && node.base64) {
client.write(new Buffer(msg.payload,'base64'));
} else {
client.write(new Buffer(""+msg.payload));
}
}
}
});
} else { } else {
var connectedSockets = []; var connectedSockets = [];
var server = net.createServer(function (socket) { var server = net.createServer(function (socket) {