mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
update promise & message handling
This commit is contained in:
parent
161f6090c1
commit
c649e1b4a2
@ -87,17 +87,6 @@ module.exports = function(RED) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function processAsyncResult(result, callbacks) {
|
|
||||||
var promises = callbacks;
|
|
||||||
if (Array.isArray(result)) {
|
|
||||||
promises = promises.concat(result);
|
|
||||||
}
|
|
||||||
else if(result) {
|
|
||||||
promises = promises.concat([result]);
|
|
||||||
}
|
|
||||||
return Promise.all(promises);
|
|
||||||
}
|
|
||||||
|
|
||||||
function FunctionNode(n) {
|
function FunctionNode(n) {
|
||||||
RED.nodes.createNode(this,n);
|
RED.nodes.createNode(this,n);
|
||||||
var node = this;
|
var node = this;
|
||||||
@ -108,44 +97,6 @@ module.exports = function(RED) {
|
|||||||
|
|
||||||
var handleNodeDoneCall = true;
|
var handleNodeDoneCall = true;
|
||||||
|
|
||||||
var callbackPromises = [];
|
|
||||||
function createAsyncCallback() {
|
|
||||||
var result = undefined;
|
|
||||||
var callbacks = undefined;
|
|
||||||
|
|
||||||
var promise = new Promise((resolve, reject) => {
|
|
||||||
if (result) {
|
|
||||||
if (result.error) {
|
|
||||||
reject(result.error);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
resolve(result.value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
callbacks = {
|
|
||||||
resolve: resolve,
|
|
||||||
reject: reject
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
var cb = function(err, val) {
|
|
||||||
if (callbacks) {
|
|
||||||
if (err) {
|
|
||||||
callbacks.reject(err);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
callbacks.resolve(val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
result = { error: err, value: val };
|
|
||||||
}
|
|
||||||
};
|
|
||||||
callbackPromises.push(promise);
|
|
||||||
return cb;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check to see if the Function appears to call `node.done()`. If so,
|
// Check to see if the Function appears to call `node.done()`. If so,
|
||||||
// we will assume it is well written and does actually call node.done().
|
// we will assume it is well written and does actually call node.done().
|
||||||
// Otherwise, we will call node.done() after the function returns regardless.
|
// Otherwise, we will call node.done() after the function returns regardless.
|
||||||
@ -171,13 +122,14 @@ module.exports = function(RED) {
|
|||||||
"};\n"+
|
"};\n"+
|
||||||
node.func+"\n"+
|
node.func+"\n"+
|
||||||
"})(msg,send,done);";
|
"})(msg,send,done);";
|
||||||
var iniText = "(function () {\n"+node.ini +"\n})();";
|
var iniText = "(async function () {\n"+node.ini +"\n})();";
|
||||||
var finText = "(function () {\n"+node.fin +"\n})();";
|
var finText = "(function () {\n"+node.fin +"\n})();";
|
||||||
var finScript = null;
|
var finScript = null;
|
||||||
var finOpt = null;
|
var finOpt = null;
|
||||||
node.topic = n.topic;
|
node.topic = n.topic;
|
||||||
node.outstandingTimers = [];
|
node.outstandingTimers = [];
|
||||||
node.outstandingIntervals = [];
|
node.outstandingIntervals = [];
|
||||||
|
var initValue = undefined;
|
||||||
var sandbox = {
|
var sandbox = {
|
||||||
console:console,
|
console:console,
|
||||||
util:util,
|
util:util,
|
||||||
@ -305,8 +257,9 @@ module.exports = function(RED) {
|
|||||||
node.outstandingIntervals.splice(index,1);
|
node.outstandingIntervals.splice(index,1);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
promisify: (util.hasOwnProperty("promisify") ? util.promisify : undefined),
|
getInitValue: function() {
|
||||||
asyncCallback: createAsyncCallback
|
return initValue;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if (util.hasOwnProperty('promisify')) {
|
if (util.hasOwnProperty('promisify')) {
|
||||||
sandbox.setTimeout[util.promisify.custom] = function(after, value) {
|
sandbox.setTimeout[util.promisify.custom] = function(after, value) {
|
||||||
@ -314,6 +267,7 @@ module.exports = function(RED) {
|
|||||||
sandbox.setTimeout(function(){ resolve(value); }, after);
|
sandbox.setTimeout(function(){ resolve(value); }, after);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
sandbox.promisify = util.promisify;
|
||||||
}
|
}
|
||||||
var context = vm.createContext(sandbox);
|
var context = vm.createContext(sandbox);
|
||||||
try {
|
try {
|
||||||
@ -330,91 +284,119 @@ module.exports = function(RED) {
|
|||||||
}
|
}
|
||||||
var promise = Promise.resolve();
|
var promise = Promise.resolve();
|
||||||
if (iniScript) {
|
if (iniScript) {
|
||||||
var result = vm.runInContext(iniText, context, iniOpt);
|
promise = iniScript.runInContext(context, iniOpt);
|
||||||
if (result || callbackPromises) {
|
}
|
||||||
promise = processAsyncResult(result, callbackPromises);
|
|
||||||
|
function processMessage(msg, send, done) {
|
||||||
|
try {
|
||||||
|
var start = process.hrtime();
|
||||||
|
context.msg = msg;
|
||||||
|
context.send = send;
|
||||||
|
context.done = done;
|
||||||
|
|
||||||
|
node.script.runInContext(context);
|
||||||
|
sendResults(node,send,msg._msgid,context.results,false);
|
||||||
|
if (handleNodeDoneCall) {
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
|
||||||
|
var duration = process.hrtime(start);
|
||||||
|
var converted = Math.floor((duration[0] * 1e9 + duration[1])/10000)/100;
|
||||||
|
node.metric("duration", msg, converted);
|
||||||
|
if (process.env.NODE_RED_FUNCTION_TIME) {
|
||||||
|
node.status({fill:"yellow",shape:"dot",text:""+converted});
|
||||||
|
}
|
||||||
|
} catch(err) {
|
||||||
|
if ((typeof err === "object") && err.hasOwnProperty("stack")) {
|
||||||
|
//remove unwanted part
|
||||||
|
var index = err.stack.search(/\n\s*at ContextifyScript.Script.runInContext/);
|
||||||
|
err.stack = err.stack.slice(0, index).split('\n').slice(0,-1).join('\n');
|
||||||
|
var stack = err.stack.split(/\r?\n/);
|
||||||
|
|
||||||
|
//store the error in msg to be used in flows
|
||||||
|
msg.error = err;
|
||||||
|
|
||||||
|
var line = 0;
|
||||||
|
var errorMessage;
|
||||||
|
if (stack.length > 0) {
|
||||||
|
while (line < stack.length && stack[line].indexOf("ReferenceError") !== 0) {
|
||||||
|
line++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line < stack.length) {
|
||||||
|
errorMessage = stack[line];
|
||||||
|
var m = /:(\d+):(\d+)$/.exec(stack[line+1]);
|
||||||
|
if (m) {
|
||||||
|
var lineno = Number(m[1])-1;
|
||||||
|
var cha = m[2];
|
||||||
|
errorMessage += " (line "+lineno+", col "+cha+")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!errorMessage) {
|
||||||
|
errorMessage = err.toString();
|
||||||
|
}
|
||||||
|
done(errorMessage);
|
||||||
|
}
|
||||||
|
else if (typeof err === "string") {
|
||||||
|
done(err);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
done(JSON.stringify(err));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
promise.then(function (v) {
|
|
||||||
node.on("input", function(msg,send,done) {
|
const RESOLVING = 0;
|
||||||
|
const RESOLVED = 1;
|
||||||
|
const ERROR = 2;
|
||||||
|
var state = RESOLVING;
|
||||||
|
var messages = [];
|
||||||
|
|
||||||
|
node.on("input", function(msg,send,done) {
|
||||||
|
if(state === RESOLVING) {
|
||||||
|
messages.push({msg:msg, send:send, done:done});
|
||||||
|
}
|
||||||
|
else if(state === RESOLVED) {
|
||||||
|
processMessage(msg, send, done);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
node.on("close", function() {
|
||||||
|
if (finScript) {
|
||||||
try {
|
try {
|
||||||
var start = process.hrtime();
|
finScript.runInContext(context, finOpt);
|
||||||
context.msg = msg;
|
|
||||||
context.send = send;
|
|
||||||
context.done = done;
|
|
||||||
|
|
||||||
node.script.runInContext(context);
|
|
||||||
sendResults(node,send,msg._msgid,context.results,false);
|
|
||||||
if (handleNodeDoneCall) {
|
|
||||||
done();
|
|
||||||
}
|
|
||||||
|
|
||||||
var duration = process.hrtime(start);
|
|
||||||
var converted = Math.floor((duration[0] * 1e9 + duration[1])/10000)/100;
|
|
||||||
node.metric("duration", msg, converted);
|
|
||||||
if (process.env.NODE_RED_FUNCTION_TIME) {
|
|
||||||
node.status({fill:"yellow",shape:"dot",text:""+converted});
|
|
||||||
}
|
|
||||||
} catch(err) {
|
|
||||||
if ((typeof err === "object") && err.hasOwnProperty("stack")) {
|
|
||||||
//remove unwanted part
|
|
||||||
var index = err.stack.search(/\n\s*at ContextifyScript.Script.runInContext/);
|
|
||||||
err.stack = err.stack.slice(0, index).split('\n').slice(0,-1).join('\n');
|
|
||||||
var stack = err.stack.split(/\r?\n/);
|
|
||||||
|
|
||||||
//store the error in msg to be used in flows
|
|
||||||
msg.error = err;
|
|
||||||
|
|
||||||
var line = 0;
|
|
||||||
var errorMessage;
|
|
||||||
if (stack.length > 0) {
|
|
||||||
while (line < stack.length && stack[line].indexOf("ReferenceError") !== 0) {
|
|
||||||
line++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (line < stack.length) {
|
|
||||||
errorMessage = stack[line];
|
|
||||||
var m = /:(\d+):(\d+)$/.exec(stack[line+1]);
|
|
||||||
if (m) {
|
|
||||||
var lineno = Number(m[1])-1;
|
|
||||||
var cha = m[2];
|
|
||||||
errorMessage += " (line "+lineno+", col "+cha+")";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!errorMessage) {
|
|
||||||
errorMessage = err.toString();
|
|
||||||
}
|
|
||||||
done(errorMessage);
|
|
||||||
}
|
|
||||||
else if (typeof err === "string") {
|
|
||||||
done(err);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
done(JSON.stringify(err));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
catch (err) {
|
||||||
node.on("close", function() {
|
node.error(err);
|
||||||
if (finScript) {
|
|
||||||
try {
|
|
||||||
finScript.runInContext(context, finOpt);
|
|
||||||
}
|
|
||||||
catch (err) {
|
|
||||||
node.error(err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
while (node.outstandingTimers.length > 0) {
|
}
|
||||||
clearTimeout(node.outstandingTimers.pop());
|
while (node.outstandingTimers.length > 0) {
|
||||||
}
|
clearTimeout(node.outstandingTimers.pop());
|
||||||
while (node.outstandingIntervals.length > 0) {
|
}
|
||||||
clearInterval(node.outstandingIntervals.pop());
|
while (node.outstandingIntervals.length > 0) {
|
||||||
}
|
clearInterval(node.outstandingIntervals.pop());
|
||||||
node.status({});
|
}
|
||||||
});
|
node.status({});
|
||||||
|
});
|
||||||
|
|
||||||
|
promise.then(function (v) {
|
||||||
|
initValue = v;
|
||||||
|
var msgs = messages;
|
||||||
|
messages = [];
|
||||||
|
while (msgs.length > 0) {
|
||||||
|
msgs.forEach(function (s) {
|
||||||
|
processMessage(s.msg, s.send, s.done);
|
||||||
|
});
|
||||||
|
msgs = messages;
|
||||||
|
messages = [];
|
||||||
|
}
|
||||||
|
state = RESOLVED;
|
||||||
}).catch((error) => {
|
}).catch((error) => {
|
||||||
|
messages = [];
|
||||||
|
state = ERROR;
|
||||||
node.error(error);
|
node.error(error);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
catch(err) {
|
catch(err) {
|
||||||
// eg SyntaxError - which v8 doesn't include line number information
|
// eg SyntaxError - which v8 doesn't include line number information
|
||||||
@ -426,3 +408,4 @@ module.exports = function(RED) {
|
|||||||
RED.nodes.registerType("function",FunctionNode);
|
RED.nodes.registerType("function",FunctionNode);
|
||||||
RED.library.register("functions");
|
RED.library.register("functions");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user