1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Update TCP nodes to node.done

This commit is contained in:
Nick O'Leary 2019-08-15 10:19:03 +01:00
parent 83932e1725
commit cd529d53ae
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9

View File

@ -132,7 +132,7 @@ module.exports = function(RED) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
} }
} else { } else {
if (node.done) { node.done(); } if (node.doneClose) { node.doneClose(); }
} }
}); });
client.on('error', function(err) { client.on('error', function(err) {
@ -142,7 +142,7 @@ module.exports = function(RED) {
setupTcpClient(); setupTcpClient();
this.on('close', function(done) { this.on('close', function(done) {
node.done = done; node.doneClose = done;
this.closing = true; this.closing = true;
if (client) { client.destroy(); } if (client) { client.destroy(); }
clearTimeout(reconnectTimeout); clearTimeout(reconnectTimeout);
@ -305,13 +305,13 @@ module.exports = function(RED) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
} }
} else { } else {
if (node.done) { node.done(); } if (node.doneClose) { node.doneClose(); }
} }
}); });
} }
setupTcpClient(); setupTcpClient();
node.on("input", function(msg) { node.on("input", function(msg,nodeSend,nodeDone) {
if (node.connected && msg.payload != null) { if (node.connected && msg.payload != null) {
if (Buffer.isBuffer(msg.payload)) { if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload); client.write(msg.payload);
@ -325,10 +325,11 @@ module.exports = function(RED) {
if (client) { node.status({}); client.destroy(); } if (client) { node.status({}); client.destroy(); }
} }
} }
nodeDone();
}); });
node.on("close", function(done) { node.on("close", function(done) {
node.done = done; node.doneClose = done;
this.closing = true; this.closing = true;
if (client) { client.destroy(); } if (client) { client.destroy(); }
clearTimeout(reconnectTimeout); clearTimeout(reconnectTimeout);
@ -337,7 +338,7 @@ module.exports = function(RED) {
} }
else if (node.beserver == "reply") { else if (node.beserver == "reply") {
node.on("input",function(msg) { node.on("input",function(msg, nodeSend, nodeDone) {
if (msg._session && msg._session.type == "tcp") { if (msg._session && msg._session.type == "tcp") {
var client = connectionPool[msg._session.id]; var client = connectionPool[msg._session.id];
if (client) { if (client) {
@ -361,6 +362,7 @@ module.exports = function(RED) {
} }
} }
} }
nodeDone();
}); });
} }
else { else {
@ -389,7 +391,7 @@ module.exports = function(RED) {
}); });
}); });
node.on("input", function(msg) { node.on("input", function(msg, nodeSend, nodeDone) {
if (msg.payload != null) { if (msg.payload != null) {
var buffer; var buffer;
if (Buffer.isBuffer(msg.payload)) { if (Buffer.isBuffer(msg.payload)) {
@ -404,6 +406,7 @@ module.exports = function(RED) {
else { connectedSockets[i].write(buffer); } else { connectedSockets[i].write(buffer); }
} }
} }
nodeDone();
}); });
server.on('error', function(err) { server.on('error', function(err) {
@ -461,7 +464,7 @@ module.exports = function(RED) {
var clients = {}; var clients = {};
this.on("input", function(msg) { this.on("input", function(msg, nodeSend, nodeDone) {
var i = 0; var i = 0;
if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) { if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
msg.payload = msg.payload.toString(); msg.payload = msg.payload.toString();
@ -483,7 +486,7 @@ module.exports = function(RED) {
connected: false, connected: false,
connecting: false connecting: false
}; };
enqueue(clients[connection_id].msgQueue, msg); enqueue(clients[connection_id].msgQueue, {msg:msg,nodeSend:nodeSend, nodeDone: nodeDone});
clients[connection_id].lastMsg = msg; clients[connection_id].lastMsg = msg;
if (!clients[connection_id].connecting && !clients[connection_id].connected) { if (!clients[connection_id].connecting && !clients[connection_id].connected) {
@ -505,9 +508,10 @@ module.exports = function(RED) {
if (clients[connection_id] && clients[connection_id].client) { if (clients[connection_id] && clients[connection_id].client) {
clients[connection_id].connected = true; clients[connection_id].connected = true;
clients[connection_id].connecting = false; clients[connection_id].connecting = false;
let msg; let event;
while (msg = dequeue(clients[connection_id].msgQueue)) { while (event = dequeue(clients[connection_id].msgQueue)) {
clients[connection_id].client.write(msg.payload); clients[connection_id].client.write(event.msg.payload);
event.nodeDone();
} }
if (node.out === "time" && node.splitc < 0) { if (node.out === "time" && node.splitc < 0) {
clients[connection_id].connected = clients[connection_id].connecting = false; clients[connection_id].connected = clients[connection_id].connecting = false;
@ -527,7 +531,7 @@ module.exports = function(RED) {
if (clients[connection_id]) { if (clients[connection_id]) {
const msg = clients[connection_id].lastMsg || {}; const msg = clients[connection_id].lastMsg || {};
msg.payload = data; msg.payload = data;
node.send(RED.util.cloneMessage(msg)); nodeSend(RED.util.cloneMessage(msg));
} }
} }
// else if (node.splitc === 0) { // else if (node.splitc === 0) {
@ -550,7 +554,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {}; const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i+1); msg.payload = Buffer.alloc(i+1);
buf.copy(msg.payload,0,0,i+1); buf.copy(msg.payload,0,0,i+1);
node.send(msg); nodeSend(msg);
if (clients[connection_id].client) { if (clients[connection_id].client) {
node.status({}); node.status({});
clients[connection_id].client.destroy(); clients[connection_id].client.destroy();
@ -572,7 +576,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {}; const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i); msg.payload = Buffer.alloc(i);
buf.copy(msg.payload,0,0,i); buf.copy(msg.payload,0,0,i);
node.send(msg); nodeSend(msg);
if (clients[connection_id].client) { if (clients[connection_id].client) {
node.status({}); node.status({});
clients[connection_id].client.destroy(); clients[connection_id].client.destroy();
@ -591,7 +595,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {}; const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i); msg.payload = Buffer.alloc(i);
buf.copy(msg.payload,0,0,i); buf.copy(msg.payload,0,0,i);
node.send(msg); nodeSend(msg);
if (clients[connection_id].client) { if (clients[connection_id].client) {
node.status({}); node.status({});
clients[connection_id].client.destroy(); clients[connection_id].client.destroy();
@ -628,9 +632,9 @@ module.exports = function(RED) {
break; break;
} }
} }
if (node.done && !anyConnected) { if (node.doneClose && !anyConnected) {
clients = {}; clients = {};
node.done(); node.doneClose();
} }
}); });
@ -663,13 +667,15 @@ module.exports = function(RED) {
} }
else if (!clients[connection_id].connecting && clients[connection_id].connected) { else if (!clients[connection_id].connecting && clients[connection_id].connected) {
if (clients[connection_id] && clients[connection_id].client) { if (clients[connection_id] && clients[connection_id].client) {
clients[connection_id].client.write(dequeue(clients[connection_id].msgQueue).payload); let event = dequeue(clients[connection_id].msgQueue)
clients[connection_id].client.write(event.msg.payload);
event.nodeDone();
} }
} }
}); });
this.on("close", function(done) { this.on("close", function(done) {
node.done = done; node.doneClose = done;
for (var cl in clients) { for (var cl in clients) {
if (clients[cl].hasOwnProperty("client")) { if (clients[cl].hasOwnProperty("client")) {
clients[cl].client.destroy(); clients[cl].client.destroy();