Merge pull request #2227 from node-red/node-done

Adds Done callback to Input event handler
This commit is contained in:
Nick O'Leary
2019-08-21 11:15:00 +01:00
committed by GitHub
25 changed files with 1333 additions and 565 deletions

View File

@@ -0,0 +1,136 @@
<script type="text/x-red" data-template-name="complete">
<div class="form-row node-input-target-row">
<button id="node-input-complete-target-select" class="red-ui-button" data-i18n="common.label.selectNodes"></button>
</div>
<div class="form-row node-input-target-row node-input-target-list-row" style="min-height: 100px">
<div id="node-input-complete-target-container-div"></div>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
</div>
</script>
<script type="text/javascript">
RED.nodes.registerType('complete',{
category: 'input',
color:"#c0edc0",
defaults: {
name: {value:""},
scope: {value:[]},
uncaught: {value:false}
},
inputs:0,
outputs:1,
icon: "alert.svg",
label: function() {
if (this.name) {
return this.name;
}
return this._("complete.completeNodes",{number:this.scope.length});
},
labelStyle: function() {
return this.name?"node_label_italic":"";
},
oneditprepare: function() {
var node = this;
var scope = node.scope || [];
this._resize = function() {
var rows = $("#dialog-form>div:not(.node-input-target-list-row)");
var height = $("#dialog-form").height();
for (var i=0;i<rows.length;i++) {
height -= $(rows[i]).outerHeight(true);
}
var editorRow = $("#dialog-form>div.node-input-target-list-row");
editorRow.css("height",height+"px");
};
var dirList = $("#node-input-complete-target-container-div").css({width: "100%", height: "100%"})
.treeList({multi:true}).on("treelistitemmouseover", function(e, item) {
item.node.highlighted = true;
item.node.dirty = true;
RED.view.redraw();
}).on("treelistitemmouseout", function(e, item) {
item.node.highlighted = false;
item.node.dirty = true;
RED.view.redraw();
})
var candidateNodes = RED.nodes.filterNodes({z:node.z});
var allChecked = true;
var items = [];
var nodeItemMap = {};
candidateNodes.forEach(function(n) {
if (n.id === node.id) {
return;
}
var isChecked = scope.indexOf(n.id) !== -1;
allChecked = allChecked && isChecked;
var nodeDef = RED.nodes.getType(n.type);
var label;
var sublabel;
if (nodeDef) {
var l = nodeDef.label;
label = (typeof l === "function" ? l.call(n) : l)||"";
sublabel = n.type;
if (sublabel.indexOf("subflow:") === 0) {
var subflowId = sublabel.substring(8);
var subflow = RED.nodes.subflow(subflowId);
sublabel = "subflow : "+subflow.name;
}
}
if (!nodeDef || !label) {
label = n.type;
}
nodeItemMap[n.id] = {
node: n,
label: label,
sublabel: sublabel,
selected: isChecked
};
items.push(nodeItemMap[n.id]);
});
dirList.treeList('data',items);
$("#node-input-complete-target-select").on("click", function(e) {
e.preventDefault();
var preselected = dirList.treeList('selected').map(function(n) {return n.node.id});
RED.tray.hide();
RED.view.selectNodes({
selected: preselected,
onselect: function(selection) {
RED.tray.show();
var newlySelected = {};
selection.forEach(function(n) {
newlySelected[n.id] = true;
if (nodeItemMap[n.id]) {
nodeItemMap[n.id].treeList.select(true);
}
})
preselected.forEach(function(id) {
if (!newlySelected[id]) {
nodeItemMap[id].treeList.select(false);
}
})
},
oncancel: function() {
RED.tray.show();
},
filter: function(n) {
return n.id !== node.id;
}
});
})
},
oneditsave: function() {
this.scope = $("#node-input-complete-target-container-div").treeList('selected').map(function(i) { return i.node.id})
},
oneditresize: function(size) {
this._resize();
}
});
</script>

View File

@@ -0,0 +1,30 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
module.exports = function(RED) {
"use strict";
function CompleteNode(n) {
RED.nodes.createNode(this,n);
var node = this;
this.scope = n.scope;
this.on("input",function(msg) {
this.send(msg);
});
}
RED.nodes.registerType("complete",CompleteNode);
}

View File

@@ -81,7 +81,7 @@ module.exports = function(RED) {
}
}
this.on("input", function(msg) {
this.on("input", function(msg, send, done) {
if (this.complete === "true") {
// debug complete msg object
if (this.console === "true") {
@@ -90,13 +90,14 @@ module.exports = function(RED) {
if (this.active && this.tosidebar) {
sendDebug({id:node.id, name:node.name, topic:msg.topic, msg:msg, _path:msg._path});
}
done();
} else {
prepareValue(msg,function(err,msg) {
prepareValue(msg,function(err,debugMsg) {
if (err) {
node.error(err);
return;
}
var output = msg.msg;
var output = debugMsg.msg;
if (node.console === "true") {
if (typeof output === "string") {
node.log((output.indexOf("\n") !== -1 ? "\n" : "") + output);
@@ -114,9 +115,10 @@ module.exports = function(RED) {
}
if (node.active) {
if (node.tosidebar == true) {
sendDebug(msg);
sendDebug(debugMsg);
}
}
done();
});
}
})

View File

@@ -38,7 +38,7 @@ module.exports = function(RED) {
//node.error("Exec node timeout");
}
this.on("input", function(msg) {
this.on("input", function(msg, nodeSend, nodeDone) {
if (msg.hasOwnProperty("kill")) {
if (typeof msg.kill !== "string" || msg.kill.length === 0 || !msg.kill.toUpperCase().startsWith("SIG") ) { msg.kill = "SIGTERM"; }
if (msg.hasOwnProperty("pid")) {
@@ -53,6 +53,7 @@ module.exports = function(RED) {
node.status({fill:"red",shape:"dot",text:"killed"});
}
}
nodeDone();
}
else {
var child;
@@ -85,14 +86,14 @@ module.exports = function(RED) {
// console.log('[exec] stdout: ' + data,child.pid);
if (isUtf8(data)) { msg.payload = data.toString(); }
else { msg.payload = data; }
node.send([RED.util.cloneMessage(msg),null,null]);
nodeSend([RED.util.cloneMessage(msg),null,null]);
}
});
child.stderr.on('data', function (data) {
if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) {
if (isUtf8(data)) { msg.payload = data.toString(); }
else { msg.payload = Buffer.from(data); }
node.send([null,RED.util.cloneMessage(msg),null]);
nodeSend([null,RED.util.cloneMessage(msg),null]);
}
});
child.on('close', function (code,signal) {
@@ -108,8 +109,9 @@ module.exports = function(RED) {
if (code === null) { node.status({fill:"red",shape:"dot",text:"killed"}); }
else if (code < 0) { node.status({fill:"red",shape:"dot",text:"rc:"+code}); }
else { node.status({fill:"yellow",shape:"dot",text:"rc:"+code}); }
node.send([null,null,RED.util.cloneMessage(msg)]);
nodeSend([null,null,RED.util.cloneMessage(msg)]);
}
nodeDone();
});
child.on('error', function (code) {
if (child.tout) { clearTimeout(child.tout); }
@@ -154,9 +156,10 @@ module.exports = function(RED) {
msg.rc = msg3.payload;
if (msg2) { msg2.rc = msg3.payload; }
}
node.send([msg,msg2,msg3]);
nodeSend([msg,msg2,msg3]);
if (child.tout) { clearTimeout(child.tout); }
delete node.activeProcesses[child.pid];
nodeDone();
});
node.status({fill:"blue",shape:"dot",text:"pid:"+child.pid});
child.on('error',function() {});

View File

@@ -19,7 +19,7 @@ module.exports = function(RED) {
var util = require("util");
var vm = require("vm");
function sendResults(node,_msgid,msgs) {
function sendResults(node,send,_msgid,msgs) {
if (msgs == null) {
return;
} else if (!util.isArray(msgs)) {
@@ -49,7 +49,7 @@ module.exports = function(RED) {
}
}
if (msgCount>0) {
node.send(msgs);
send(msgs);
}
}
@@ -58,8 +58,17 @@ module.exports = function(RED) {
var node = this;
this.name = n.name;
this.func = n.func;
var handleNodeDoneCall = true;
// Check to see if the Function appears to call `node.done()`. If so,
// we will assume it is well written and does actually call node.done().
// Otherwise, we will call node.done() after the function returns regardless.
if (/node\.done\s*\(\s*\)/.test(this.func)) {
handleNodeDoneCall = false;
}
var functionText = "var results = null;"+
"results = (function(msg){ "+
"results = (function(msg,__send__,__done__){ "+
"var __msgid__ = msg._msgid;"+
"var node = {"+
"id:__node__.id,"+
@@ -71,10 +80,11 @@ module.exports = function(RED) {
"trace:__node__.trace,"+
"on:__node__.on,"+
"status:__node__.status,"+
"send:function(msgs){ __node__.send(__msgid__,msgs);}"+
"send:function(msgs){ __node__.send(__send__,__msgid__,msgs);},"+
"done:__done__"+
"};\n"+
this.func+"\n"+
"})(msg);";
"})(msg,send,done);";
this.topic = n.topic;
this.outstandingTimers = [];
this.outstandingIntervals = [];
@@ -104,8 +114,8 @@ module.exports = function(RED) {
trace: function() {
node.trace.apply(node, arguments);
},
send: function(id, msgs) {
sendResults(node, id, msgs);
send: function(send, id, msgs) {
sendResults(node, send, id, msgs);
},
on: function() {
if (arguments[0] === "input") {
@@ -223,12 +233,18 @@ module.exports = function(RED) {
// lineOffset: -11, // line number offset to be used for stack traces
// columnOffset: 0, // column number offset to be used for stack traces
});
this.on("input", function(msg) {
this.on("input", function(msg,send,done) {
try {
var start = process.hrtime();
context.msg = msg;
context.send = send;
context.done = done;
this.script.runInContext(context);
sendResults(this,msg._msgid,context.results);
sendResults(this,send,msg._msgid,context.results);
if (handleNodeDoneCall) {
done();
}
var duration = process.hrtime(start);
var converted = Math.floor((duration[0] * 1e9 + duration[1])/10000)/100;

View File

@@ -334,7 +334,7 @@ module.exports = function(RED) {
}
};
this.publish = function (msg) {
this.publish = function (msg,done) {
if (node.connected) {
if (msg.payload === null || msg.payload === undefined) {
msg.payload = "";
@@ -350,7 +350,10 @@ module.exports = function(RED) {
qos: msg.qos || 0,
retain: msg.retain || false
};
node.client.publish(msg.topic, msg.payload, options, function(err) {return});
node.client.publish(msg.topic, msg.payload, options, function(err) {
done && done();
return
});
}
};
@@ -453,7 +456,7 @@ module.exports = function(RED) {
if (this.brokerConn) {
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
this.on("input",function(msg) {
this.on("input",function(msg,send,done) {
if (msg.qos) {
msg.qos = parseInt(msg.qos);
if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) {
@@ -468,9 +471,13 @@ module.exports = function(RED) {
}
if ( msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
this.brokerConn.publish(msg); // send the message
this.brokerConn.publish(msg, done); // send the message
} else {
node.warn(RED._("mqtt.errors.invalid-topic"));
done();
}
else { node.warn(RED._("mqtt.errors.invalid-topic")); }
} else {
done();
}
});
if (this.brokerConn.connected) {

View File

@@ -50,7 +50,7 @@ module.exports = function(RED) {
noprox = proxyConfig.noproxy;
}
this.on("input",function(msg) {
this.on("input",function(msg,nodeSend,nodeDone) {
var preRequestTimestamp = process.hrtime();
node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"});
var url = nodeUrl || msg.url;
@@ -62,12 +62,14 @@ module.exports = function(RED) {
}
if (!url) {
node.error(RED._("httpin.errors.no-url"),msg);
nodeDone();
return;
}
// url must start http:// or https:// so assume http:// if not set
if (url.indexOf("://") !== -1 && url.indexOf("http") !== 0) {
node.warn(RED._("httpin.errors.invalid-transport"));
node.status({fill:"red",shape:"ring",text:"httpin.errors.invalid-transport"});
nodeDone();
return;
}
if (!((url.indexOf("http://") === 0) || (url.indexOf("https://") === 0))) {
@@ -261,10 +263,12 @@ module.exports = function(RED) {
}
} catch(err) {
node.error(RED._("httpin.errors.invalid-payload"),msg);
nodeDone();
return;
}
} else {
node.error(RED._("httpin.errors.invalid-payload"),msg);
nodeDone();
return;
}
}
@@ -320,7 +324,8 @@ module.exports = function(RED) {
}
msg.payload = err.toString() + " : " + url;
msg.statusCode = err.code;
node.send(msg);
nodeSend(msg);
nodeDone();
}else{
msg.statusCode = res.statusCode;
msg.headers = res.headers;
@@ -354,7 +359,8 @@ module.exports = function(RED) {
}
}
node.status({});
node.send(msg);
nodeSend(msg);
nodeDone();
}
});
});

View File

@@ -309,7 +309,7 @@ module.exports = function(RED) {
node.status(status);
});
}
this.on("input", function(msg) {
this.on("input", function(msg, nodeSend, nodeDone) {
var payload;
if (this.serverConfig.wholemsg) {
var sess;
@@ -337,6 +337,7 @@ module.exports = function(RED) {
});
}
}
nodeDone();
});
this.on('close', function() {
node.status({});

View File

@@ -132,7 +132,7 @@ module.exports = function(RED) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
}
} else {
if (node.done) { node.done(); }
if (node.doneClose) { node.doneClose(); }
}
});
client.on('error', function(err) {
@@ -142,7 +142,7 @@ module.exports = function(RED) {
setupTcpClient();
this.on('close', function(done) {
node.done = done;
node.doneClose = done;
this.closing = true;
if (client) { client.destroy(); }
clearTimeout(reconnectTimeout);
@@ -305,13 +305,13 @@ module.exports = function(RED) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
}
} else {
if (node.done) { node.done(); }
if (node.doneClose) { node.doneClose(); }
}
});
}
setupTcpClient();
node.on("input", function(msg) {
node.on("input", function(msg,nodeSend,nodeDone) {
if (node.connected && msg.payload != null) {
if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload);
@@ -325,10 +325,11 @@ module.exports = function(RED) {
if (client) { node.status({}); client.destroy(); }
}
}
nodeDone();
});
node.on("close", function(done) {
node.done = done;
node.doneClose = done;
this.closing = true;
if (client) { client.destroy(); }
clearTimeout(reconnectTimeout);
@@ -337,7 +338,7 @@ module.exports = function(RED) {
}
else if (node.beserver == "reply") {
node.on("input",function(msg) {
node.on("input",function(msg, nodeSend, nodeDone) {
if (msg._session && msg._session.type == "tcp") {
var client = connectionPool[msg._session.id];
if (client) {
@@ -361,6 +362,7 @@ module.exports = function(RED) {
}
}
}
nodeDone();
});
}
else {
@@ -389,7 +391,7 @@ module.exports = function(RED) {
});
});
node.on("input", function(msg) {
node.on("input", function(msg, nodeSend, nodeDone) {
if (msg.payload != null) {
var buffer;
if (Buffer.isBuffer(msg.payload)) {
@@ -404,6 +406,7 @@ module.exports = function(RED) {
else { connectedSockets[i].write(buffer); }
}
}
nodeDone();
});
server.on('error', function(err) {
@@ -461,7 +464,7 @@ module.exports = function(RED) {
var clients = {};
this.on("input", function(msg) {
this.on("input", function(msg, nodeSend, nodeDone) {
var i = 0;
if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
msg.payload = msg.payload.toString();
@@ -483,7 +486,7 @@ module.exports = function(RED) {
connected: false,
connecting: false
};
enqueue(clients[connection_id].msgQueue, msg);
enqueue(clients[connection_id].msgQueue, {msg:msg,nodeSend:nodeSend, nodeDone: nodeDone});
clients[connection_id].lastMsg = msg;
if (!clients[connection_id].connecting && !clients[connection_id].connected) {
@@ -505,9 +508,10 @@ module.exports = function(RED) {
if (clients[connection_id] && clients[connection_id].client) {
clients[connection_id].connected = true;
clients[connection_id].connecting = false;
let msg;
while (msg = dequeue(clients[connection_id].msgQueue)) {
clients[connection_id].client.write(msg.payload);
let event;
while (event = dequeue(clients[connection_id].msgQueue)) {
clients[connection_id].client.write(event.msg.payload);
event.nodeDone();
}
if (node.out === "time" && node.splitc < 0) {
clients[connection_id].connected = clients[connection_id].connecting = false;
@@ -527,7 +531,7 @@ module.exports = function(RED) {
if (clients[connection_id]) {
const msg = clients[connection_id].lastMsg || {};
msg.payload = data;
node.send(RED.util.cloneMessage(msg));
nodeSend(RED.util.cloneMessage(msg));
}
}
// else if (node.splitc === 0) {
@@ -550,7 +554,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i+1);
buf.copy(msg.payload,0,0,i+1);
node.send(msg);
nodeSend(msg);
if (clients[connection_id].client) {
node.status({});
clients[connection_id].client.destroy();
@@ -572,7 +576,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i);
buf.copy(msg.payload,0,0,i);
node.send(msg);
nodeSend(msg);
if (clients[connection_id].client) {
node.status({});
clients[connection_id].client.destroy();
@@ -591,7 +595,7 @@ module.exports = function(RED) {
const msg = clients[connection_id].lastMsg || {};
msg.payload = Buffer.alloc(i);
buf.copy(msg.payload,0,0,i);
node.send(msg);
nodeSend(msg);
if (clients[connection_id].client) {
node.status({});
clients[connection_id].client.destroy();
@@ -628,9 +632,9 @@ module.exports = function(RED) {
break;
}
}
if (node.done && !anyConnected) {
if (node.doneClose && !anyConnected) {
clients = {};
node.done();
node.doneClose();
}
});
@@ -663,13 +667,15 @@ module.exports = function(RED) {
}
else if (!clients[connection_id].connecting && clients[connection_id].connected) {
if (clients[connection_id] && clients[connection_id].client) {
clients[connection_id].client.write(dequeue(clients[connection_id].msgQueue).payload);
let event = dequeue(clients[connection_id].msgQueue)
clients[connection_id].client.write(event.msg.payload);
event.nodeDone();
}
}
});
this.on("close", function(done) {
node.done = done;
node.doneClose = done;
for (var cl in clients) {
if (clients[cl].hasOwnProperty("client")) {
clients[cl].client.destroy();

View File

@@ -223,16 +223,19 @@ module.exports = function(RED) {
udpInputPortsInUse[p] = sock;
}
node.on("input", function(msg) {
node.on("input", function(msg, nodeSend, nodeDone) {
if (msg.hasOwnProperty("payload")) {
var add = node.addr || msg.ip || "";
var por = node.port || msg.port || 0;
if (add === "") {
node.warn(RED._("udp.errors.ip-notset"));
nodeDone();
} else if (por === 0) {
node.warn(RED._("udp.errors.port-notset"));
nodeDone();
} else if (isNaN(por) || (por < 1) || (por > 65535)) {
node.warn(RED._("udp.errors.port-invalid"));
nodeDone();
} else {
var message;
if (node.base64) {
@@ -247,6 +250,7 @@ module.exports = function(RED) {
node.error("udp : "+err,msg);
}
message = null;
nodeDone();
});
}
}

View File

@@ -328,13 +328,14 @@ module.exports = function(RED) {
}
if (valid) {
this.on('input', function(msg) {
this.on('input', function(msg, send, done) {
applyRules(msg, 0, (err,msg) => {
if (err) {
node.error(err,msg);
} else if (msg) {
node.send(msg);
send(msg);
}
done();
})
});
}

View File

@@ -34,8 +34,9 @@ module.exports = function(RED) {
}
return data.toString();
}
function FileNode(n) {
// Write/delete a file
RED.nodes.createNode(this,n);
this.filename = n.filename;
this.appendNewline = n.appendNewline;
@@ -48,7 +49,7 @@ module.exports = function(RED) {
node.closing = false;
node.closeCallback = null;
function processMsg(msg, done) {
function processMsg(msg,nodeSend, done) {
var filename = node.filename || msg.filename || "";
if ((!node.filename) && (!node.tout)) {
node.tout = setTimeout(function() {
@@ -68,7 +69,7 @@ module.exports = function(RED) {
if (RED.settings.verbose) {
node.log(RED._("file.status.deletedfile",{file:filename}));
}
node.send(msg);
nodeSend(msg);
}
done();
});
@@ -101,7 +102,7 @@ module.exports = function(RED) {
});
wstream.on("open", function() {
wstream.end(buf, function() {
node.send(msg);
nodeSend(msg);
done();
});
})
@@ -150,13 +151,13 @@ module.exports = function(RED) {
if (node.filename) {
// Static filename - write and reuse the stream next time
node.wstream.write(buf, function() {
node.send(msg);
nodeSend(msg);
done();
});
} else {
// Dynamic filename - write and close the stream
node.wstream.end(buf, function() {
node.send(msg);
nodeSend(msg);
delete node.wstream;
delete node.wstreamIno;
done();
@@ -169,12 +170,13 @@ module.exports = function(RED) {
}
}
function processQ(queue) {
var msg = queue[0];
processMsg(msg, function() {
function processQueue(queue) {
var event = queue[0];
processMsg(event.msg, event.send, function() {
event.done();
queue.shift();
if (queue.length > 0) {
processQ(queue);
processQueue(queue);
}
else if (node.closing) {
closeNode();
@@ -182,14 +184,19 @@ module.exports = function(RED) {
});
}
this.on("input", function(msg) {
this.on("input", function(msg,nodeSend,nodeDone) {
var msgQueue = node.msgQueue;
if (msgQueue.push(msg) > 1) {
msgQueue.push({
msg: msg,
send: nodeSend,
done: nodeDone
})
if (msgQueue.length > 1) {
// pending write exists
return;
}
try {
processQ(msgQueue);
processQueue(msgQueue);
}
catch (e) {
node.msgQueue = [];
@@ -234,6 +241,7 @@ module.exports = function(RED) {
function FileInNode(n) {
// Read a file
RED.nodes.createNode(this,n);
this.filename = n.filename;
this.format = n.format;
@@ -248,13 +256,14 @@ module.exports = function(RED) {
if (this.format === "stream") { this.chunk = true; }
var node = this;
this.on("input",function(msg) {
this.on("input",function(msg, nodeSend, nodeDone) {
var filename = (node.filename || msg.filename || "").replace(/\t|\r|\n/g,'');
if (!node.filename) {
node.status({fill:"grey",shape:"dot",text:filename});
}
if (filename === "") {
node.warn(RED._("file.errors.nofilename"));
nodeDone();
}
else {
msg.filename = filename;
@@ -288,7 +297,7 @@ module.exports = function(RED) {
parts:{index:count, ch:ch, type:type, id:msg._msgid}
}
count += 1;
node.send(m);
nodeSend(m);
}
spare = bits[i];
}
@@ -304,7 +313,7 @@ module.exports = function(RED) {
getout = false;
m.parts.count = count;
}
node.send(m);
nodeSend(m);
}
}
else {
@@ -318,8 +327,9 @@ module.exports = function(RED) {
var sendMessage = RED.util.cloneMessage(msg);
delete sendMessage.payload;
sendMessage.error = err;
node.send(sendMessage);
nodeSend(sendMessage);
}
nodeDone();
})
.on('end', function() {
if (node.chunk === false) {
@@ -327,7 +337,7 @@ module.exports = function(RED) {
msg.payload = decode(lines, node.encoding);
}
else { msg.payload = lines; }
node.send(msg);
nodeSend(msg);
}
else if (node.format === "lines") {
var m = { payload: spare,
@@ -339,12 +349,13 @@ module.exports = function(RED) {
id: msg._msgid
}
};
node.send(m);
nodeSend(m);
}
else if (getout) { // last chunk same size as high water mark - have to send empty extra packet.
var m = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} };
node.send(m);
nodeSend(m);
}
nodeDone();
});
}
});