diff --git a/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js b/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js index aa72f0fe9..eb365a54c 100644 --- a/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js +++ b/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js @@ -132,7 +132,7 @@ module.exports = function(RED) { reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); } } else { - if (node.done) { node.done(); } + if (node.doneClose) { node.doneClose(); } } }); client.on('error', function(err) { @@ -142,7 +142,7 @@ module.exports = function(RED) { setupTcpClient(); this.on('close', function(done) { - node.done = done; + node.doneClose = done; this.closing = true; if (client) { client.destroy(); } clearTimeout(reconnectTimeout); @@ -305,13 +305,13 @@ module.exports = function(RED) { reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); } } else { - if (node.done) { node.done(); } + if (node.doneClose) { node.doneClose(); } } }); } setupTcpClient(); - node.on("input", function(msg) { + node.on("input", function(msg,nodeSend,nodeDone) { if (node.connected && msg.payload != null) { if (Buffer.isBuffer(msg.payload)) { client.write(msg.payload); @@ -325,10 +325,11 @@ module.exports = function(RED) { if (client) { node.status({}); client.destroy(); } } } + nodeDone(); }); node.on("close", function(done) { - node.done = done; + node.doneClose = done; this.closing = true; if (client) { client.destroy(); } clearTimeout(reconnectTimeout); @@ -337,7 +338,7 @@ module.exports = function(RED) { } else if (node.beserver == "reply") { - node.on("input",function(msg) { + node.on("input",function(msg, nodeSend, nodeDone) { if (msg._session && msg._session.type == "tcp") { var client = connectionPool[msg._session.id]; if (client) { @@ -361,6 +362,7 @@ module.exports = function(RED) { } } } + nodeDone(); }); } else { @@ -389,7 +391,7 @@ module.exports = function(RED) { }); }); - node.on("input", function(msg) { + node.on("input", function(msg, nodeSend, nodeDone) { if (msg.payload != null) { var buffer; if (Buffer.isBuffer(msg.payload)) { @@ -404,6 +406,7 @@ module.exports = function(RED) { else { connectedSockets[i].write(buffer); } } } + nodeDone(); }); server.on('error', function(err) { @@ -461,7 +464,7 @@ module.exports = function(RED) { var clients = {}; - this.on("input", function(msg) { + this.on("input", function(msg, nodeSend, nodeDone) { var i = 0; if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) { msg.payload = msg.payload.toString(); @@ -483,7 +486,7 @@ module.exports = function(RED) { connected: false, connecting: false }; - enqueue(clients[connection_id].msgQueue, msg); + enqueue(clients[connection_id].msgQueue, {msg:msg,nodeSend:nodeSend, nodeDone: nodeDone}); clients[connection_id].lastMsg = msg; if (!clients[connection_id].connecting && !clients[connection_id].connected) { @@ -505,9 +508,10 @@ module.exports = function(RED) { if (clients[connection_id] && clients[connection_id].client) { clients[connection_id].connected = true; clients[connection_id].connecting = false; - let msg; - while (msg = dequeue(clients[connection_id].msgQueue)) { - clients[connection_id].client.write(msg.payload); + let event; + while (event = dequeue(clients[connection_id].msgQueue)) { + clients[connection_id].client.write(event.msg.payload); + event.nodeDone(); } if (node.out === "time" && node.splitc < 0) { clients[connection_id].connected = clients[connection_id].connecting = false; @@ -527,7 +531,7 @@ module.exports = function(RED) { if (clients[connection_id]) { const msg = clients[connection_id].lastMsg || {}; msg.payload = data; - node.send(RED.util.cloneMessage(msg)); + nodeSend(RED.util.cloneMessage(msg)); } } // else if (node.splitc === 0) { @@ -550,7 +554,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i+1); buf.copy(msg.payload,0,0,i+1); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -572,7 +576,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -591,7 +595,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -628,9 +632,9 @@ module.exports = function(RED) { break; } } - if (node.done && !anyConnected) { + if (node.doneClose && !anyConnected) { clients = {}; - node.done(); + node.doneClose(); } }); @@ -663,13 +667,15 @@ module.exports = function(RED) { } else if (!clients[connection_id].connecting && clients[connection_id].connected) { if (clients[connection_id] && clients[connection_id].client) { - clients[connection_id].client.write(dequeue(clients[connection_id].msgQueue).payload); + let event = dequeue(clients[connection_id].msgQueue) + clients[connection_id].client.write(event.msg.payload); + event.nodeDone(); } } }); this.on("close", function(done) { - node.done = done; + node.doneClose = done; for (var cl in clients) { if (clients[cl].hasOwnProperty("client")) { clients[cl].client.destroy();