MVP-9756: Added event handling for function node

This commit is contained in:
michaeltreyvaud 2024-01-11 08:48:40 +00:00
parent fe189e0f38
commit e8edb4437b

View File

@ -14,375 +14,420 @@
* limitations under the License.
**/
const clone = require("clone");
const PayloadValidator = require("../../PayloadValidator");
const clone = require("clone");
const PayloadValidator = require("../../PayloadValidator");
module.exports = function (RED) {
"use strict";
var util = require("util");
var vm2 = require("vm2");
const processServisbotActions = (originalMessage, message) => {
if (message.servisbot && message.servisbot.actions && Array.isArray(message.servisbot.actions) && message.servisbot.actions.length > 0) {
message.servisbot.actions.forEach((action) => {
const [func, args] = action;
if (originalMessage && originalMessage.servisbot && originalMessage.servisbot[func]) {
originalMessage.servisbot[func](args);
}
});
}
}
function sendResults(node, _msgid, msgs) {
if (msgs == null) {
return;
} else if (!util.isArray(msgs)) {
msgs = [msgs];
}
var msgCount = 0;
for (var m = 0; m < msgs.length; m++) {
if (msgs[m]) {
if (!util.isArray(msgs[m])) {
msgs[m] = [msgs[m]];
}
for (var n = 0; n < msgs[m].length; n++) {
var msg = msgs[m][n];
if (msg !== null && msg !== undefined) {
if (
typeof msg === "object" &&
!Buffer.isBuffer(msg) &&
!util.isArray(msg)
) {
msg._msgid = _msgid;
msgCount++;
} else {
var type = typeof msg;
if (type === "object") {
type = Buffer.isBuffer(msg)
? "Buffer"
: util.isArray(msg)
? "Array"
: "Date";
}
node.error(
RED._("function.error.non-message-returned", { type: type })
);
}
}
}
}
}
if (msgCount > 0) {
node.send(msgs);
}
}
const handleCodeFile = async (node, sendResults, {
logger, msg, codefile, afterVm2
}) => {
const {
payload: {
system: { organization },
},
event: {
workers: [{ id: workerId }],
},
} = msg;
const workId = workerId.split(':::')[0];
const nodeId = node.id.split(`${organization}-${workId}-`)[1];
try {
const beforeCodefile = process.hrtime();
const {
payload: {
result,
error
},
requestId
} = await codefile.run({
srcCode: node.func,
context: {
msg,
node: {
id: nodeId,
name: node.name
}
}
});
function FunctionNode(n) {
RED.nodes.createNode(this, n);
var node = this;
this.name = n.name;
this.func = n.func;
var functionText =
"var results = null;" +
"results = (function(msg){ " +
"var __msgid__ = msg._msgid;" +
"var node = {" +
"log:__node__.log," +
"error:__node__.error," +
"warn:__node__.warn," +
"debug:__node__.debug," +
"trace:__node__.trace," +
"on:__node__.on," +
"status:__node__.status," +
"send:function(msgs){ __node__.send(__msgid__,msgs);}" +
"};\n" +
this.func +
"\n" +
"})(msg);";
this.topic = n.topic;
this.outstandingTimers = [];
this.outstandingIntervals = [];
var sandbox = {
console: console,
util: util,
//Buffer:Buffer,
//Date: Date,
RED: {
util: RED.util,
},
__node__: {
log: function () {
node.log.apply(node, arguments);
},
error: function () {
node.error.apply(node, arguments);
},
warn: function () {
node.warn.apply(node, arguments);
},
debug: function () {
node.debug.apply(node, arguments);
},
trace: function () {
node.trace.apply(node, arguments);
},
send: function (id, msgs) {
sendResults(node, id, msgs);
},
on: function () {
if (arguments[0] === "input") {
throw new Error(RED._("function.error.inputListener"));
}
node.on.apply(node, arguments);
},
status: function () {
node.status.apply(node, arguments);
},
},
context: {
set: function () {
node.context().set.apply(node, arguments);
},
get: function () {
return node.context().get.apply(node, arguments);
},
keys: function () {
return node.context().keys.apply(node, arguments);
},
get global() {
return node.context().global;
},
get flow() {
return node.context().flow;
},
},
flow: {
set: function () {
node.context().flow.set.apply(node, arguments);
},
get: function () {
return node.context().flow.get.apply(node, arguments);
},
keys: function () {
return node.context().flow.keys.apply(node, arguments);
},
},
// global: {
// set: function() {
// node.context().global.set.apply(node,arguments);
// },
// get: function() {
// return node.context().global.get.apply(node,arguments);
// },
// keys: function() {
// return node.context().global.keys.apply(node,arguments);
// }
// },
setTimeout: function () {
var func = arguments[0];
var timerId;
arguments[0] = function () {
sandbox.clearTimeout(timerId);
try {
func.apply(this, arguments);
} catch (err) {
node.error(err, {});
}
};
timerId = setTimeout.apply(this, arguments);
node.outstandingTimers.push(timerId);
return timerId;
},
clearTimeout: function (id) {
clearTimeout(id);
var index = node.outstandingTimers.indexOf(id);
if (index > -1) {
node.outstandingTimers.splice(index, 1);
}
},
setInterval: function () {
var func = arguments[0];
var timerId;
arguments[0] = function () {
try {
func.apply(this, arguments);
} catch (err) {
node.error(err, {});
}
};
timerId = setInterval.apply(this, arguments);
node.outstandingIntervals.push(timerId);
return timerId;
},
clearInterval: function (id) {
clearInterval(id);
var index = node.outstandingIntervals.indexOf(id);
if (index > -1) {
node.outstandingIntervals.splice(index, 1);
}
},
};
const afterCodefile = process.hrtime(beforeCodefile);
if (util.hasOwnProperty("promisify")) {
sandbox.setTimeout[util.promisify.custom] = function (after, value) {
return new Promise(function (resolve, reject) {
sandbox.setTimeout(function () {
resolve(value);
}, after);
});
};
}
try {
this.on("input", async function (msg) {
try {
const originalMessage = clone(msg);
const payloadValidator = new PayloadValidator(msg, this.id);
var start = process.hrtime();
sandbox.msg = msg;
const vm2Instance = new vm2.VM({ sandbox, timeout: 5000 });
const beforeVm2 = process.hrtime();
const result = vm2Instance.run(functionText);
const afterVm2 = process.hrtime(beforeVm2);
payloadValidator.verify(result);
sendResults(this, msg._msgid, result);
const logger = clone(msg.logger);
let lambdaRequestId;
let {
payload: {
system: { organization },
},
event: {
workers: [{ id: workerId }],
},
} = originalMessage;
const metrics = {
lambdaRequestId: requestId,
action: 'codefile-success',
organization,
workerId,
nodeId,
rawCode: node.func,
vm2Runtime: `${
Math.floor((afterVm2[0] * 1e9 + afterVm2[1]) / 10000) / 100
}ms`,
codefileRuntime: `${
Math.floor(
(afterCodefile[0] * 1e9 + afterCodefile[1]) / 10000
) / 100
}ms`,
};
if (result) {
const payloadValidator = new PayloadValidator(msg, node.id);
payloadValidator.verify(result);
// Re-attach logger to msgs as they get lost when passing over to the lambda
let messageToForward = result;
if (Array.isArray(result)) {
// Array result, re-attach logger and process any servisbot.log actions returned from the lambda
messageToForward = result.map((_res) => {
if (_res !== null && typeof _res === 'object') {
_res.logger = logger;
processServisbotActions(msg, _res);
if (msg.servisbot) {
_res.servisbot = msg.servisbot;
}
}
return _res;
});
} else if (typeof result === 'object') {
result.logger = logger;
processServisbotActions(msg, result);
if (msg.servisbot) {
result.servisbot = msg.servisbot;
}
}
sendResults(node, msg._msgid, messageToForward);
} else {
metrics.error = error;
metrics.action = 'codefile-error';
}
logger.info(metrics);
} catch (e) {
logger.error(e);
logger.error({
message: 'Error running codefile',
action: 'codefile-error',
error: e.message,
organization,
workerId,
nodeId,
rawCode: node.func,
});
}
};
const {
settings: {
api: { codefile = false },
},
} = RED;
module.exports = function (RED) {
"use strict";
var util = require("util");
var vm2 = require("vm2");
if (codefile) {
workerId = workerId.split(":::")[0];
const nodeId = this.id.split(`${organization}-${workerId}-`)[1];
try {
const messageToSend = clone(msg);
delete messageToSend.logger;
function sendResults(node, _msgid, msgs) {
if (msgs == null) {
return;
} else if (!util.isArray(msgs)) {
msgs = [msgs];
}
var msgCount = 0;
for (var m = 0; m < msgs.length; m++) {
if (msgs[m]) {
if (!util.isArray(msgs[m])) {
msgs[m] = [msgs[m]];
}
for (var n = 0; n < msgs[m].length; n++) {
var msg = msgs[m][n];
if (msg !== null && msg !== undefined) {
if (
typeof msg === "object" &&
!Buffer.isBuffer(msg) &&
!util.isArray(msg)
) {
msg._msgid = _msgid;
msgCount++;
} else {
var type = typeof msg;
if (type === "object") {
type = Buffer.isBuffer(msg)
? "Buffer"
: util.isArray(msg)
? "Array"
: "Date";
}
node.error(
RED._("function.error.non-message-returned", { type: type })
);
}
}
}
}
}
if (msgCount > 0) {
node.send(msgs);
}
}
const beforeCodefile = process.hrtime();
const {
payload: {
result,
error
},
requestId
} = await codefile.run({ srcCode: this.func, context: { msg } });
function FunctionNode(n) {
RED.nodes.createNode(this, n);
var node = this;
this.name = n.name;
this.func = n.func;
var functionText =
"var results = null;" +
"results = (function(msg){ " +
"var __msgid__ = msg._msgid;" +
"var node = {" +
"log:__node__.log," +
"error:__node__.error," +
"warn:__node__.warn," +
"debug:__node__.debug," +
"trace:__node__.trace," +
"on:__node__.on," +
"status:__node__.status," +
"send:function(msgs){ __node__.send(__msgid__,msgs);}" +
"};\n" +
this.func +
"\n" +
"})(msg);";
this.topic = n.topic;
this.outstandingTimers = [];
this.outstandingIntervals = [];
var sandbox = {
console: console,
util: util,
//Buffer:Buffer,
//Date: Date,
RED: {
util: RED.util,
},
__node__: {
log: function () {
node.log.apply(node, arguments);
},
error: function () {
node.error.apply(node, arguments);
},
warn: function () {
node.warn.apply(node, arguments);
},
debug: function () {
node.debug.apply(node, arguments);
},
trace: function () {
node.trace.apply(node, arguments);
},
send: function (id, msgs) {
sendResults(node, id, msgs);
},
on: function () {
if (arguments[0] === "input") {
throw new Error(RED._("function.error.inputListener"));
}
node.on.apply(node, arguments);
},
status: function () {
node.status.apply(node, arguments);
},
},
context: {
set: function () {
node.context().set.apply(node, arguments);
},
get: function () {
return node.context().get.apply(node, arguments);
},
keys: function () {
return node.context().keys.apply(node, arguments);
},
get global() {
return node.context().global;
},
get flow() {
return node.context().flow;
},
},
flow: {
set: function () {
node.context().flow.set.apply(node, arguments);
},
get: function () {
return node.context().flow.get.apply(node, arguments);
},
keys: function () {
return node.context().flow.keys.apply(node, arguments);
},
},
// global: {
// set: function() {
// node.context().global.set.apply(node,arguments);
// },
// get: function() {
// return node.context().global.get.apply(node,arguments);
// },
// keys: function() {
// return node.context().global.keys.apply(node,arguments);
// }
// },
setTimeout: function () {
var func = arguments[0];
var timerId;
arguments[0] = function () {
sandbox.clearTimeout(timerId);
try {
func.apply(this, arguments);
} catch (err) {
node.error(err, {});
}
};
timerId = setTimeout.apply(this, arguments);
node.outstandingTimers.push(timerId);
return timerId;
},
clearTimeout: function (id) {
clearTimeout(id);
var index = node.outstandingTimers.indexOf(id);
if (index > -1) {
node.outstandingTimers.splice(index, 1);
}
},
setInterval: function () {
var func = arguments[0];
var timerId;
arguments[0] = function () {
try {
func.apply(this, arguments);
} catch (err) {
node.error(err, {});
}
};
timerId = setInterval.apply(this, arguments);
node.outstandingIntervals.push(timerId);
return timerId;
},
clearInterval: function (id) {
clearInterval(id);
var index = node.outstandingIntervals.indexOf(id);
if (index > -1) {
node.outstandingIntervals.splice(index, 1);
}
},
};
const afterCodefile = process.hrtime(beforeCodefile);
if (util.hasOwnProperty("promisify")) {
sandbox.setTimeout[util.promisify.custom] = function (after, value) {
return new Promise(function (resolve, reject) {
sandbox.setTimeout(function () {
resolve(value);
}, after);
});
};
}
try {
this.on("input", async function (msg) {
try {
const originalMessage = clone(msg);
const payloadValidator = new PayloadValidator(msg, this.id);
var start = process.hrtime();
sandbox.msg = msg;
// const vm2Instance = new vm2.VM({ sandbox, timeout: 5000 });
const beforeVm2 = process.hrtime();
// const result = vm2Instance.run(functionText);
const afterVm2 = process.hrtime(beforeVm2);
// payloadValidator.verify(result);
// sendResults(this, msg._msgid, result);
const logger = clone(msg.logger);
const metrics = {
lambdaRequestId: requestId,
action:'codefile-success',
organization,
workerId: workerId,
nodeId: nodeId,
rawCode: this.func,
vm2Runtime: `${
Math.floor((afterVm2[0] * 1e9 + afterVm2[1]) / 10000) / 100
}ms`,
codefileRuntime: `${
Math.floor(
(afterCodefile[0] * 1e9 + afterCodefile[1]) / 10000
) / 100
}ms`,
};
if(result){
// not required right now since we dont go via this path
// const responseMessage = result.msg
// responseMessage.logger = logger;
// payloadValidator.verify(responseMessage);
// sendResults(this,msg._msgid, responseMessage);
}
else{
metrics.error = error;
metrics.action = 'codefile-error';
}
logger.info(metrics);
} catch (e) {
logger.error(e)
logger.error({
message: "Error running codefile",
action:'codefile-error',
error: e.message,
organization,
workerId: workerId,
nodeId: nodeId,
rawCode: this.func,
});
}
}
const {
settings: {
api: { codefile = false },
},
} = RED;
// sendResults(this,msg._msgid, responseMessage);
var duration = process.hrtime(start);
var converted =
Math.floor((duration[0] * 1e9 + duration[1]) / 10000) / 100;
this.metric("duration", msg, converted);
if (process.env.NODE_RED_FUNCTION_TIME) {
this.status({ fill: "yellow", shape: "dot", text: "" + converted });
}
} catch (err) {
//remove unwanted part
var index = err.stack.search(
/\n\s*at ContextifyScript.Script.runInContext/
);
err.stack = err.stack
.slice(0, index)
.split("\n")
.slice(0, -1)
.join("\n");
var stack = err.stack.split(/\r?\n/);
if (codefile) {
await handleCodeFile(this, sendResults, {
logger,
msg: originalMessage,
codefile,
afterVm2,
});
}
//store the error in msg to be used in flows
msg.error = err;
var duration = process.hrtime(start);
var converted =
Math.floor((duration[0] * 1e9 + duration[1]) / 10000) / 100;
this.metric("duration", msg, converted);
if (process.env.NODE_RED_FUNCTION_TIME) {
this.status({ fill: "yellow", shape: "dot", text: "" + converted });
}
} catch (err) {
//remove unwanted part
var index = err.stack.search(
/\n\s*at ContextifyScript.Script.runInContext/
);
err.stack = err.stack
.slice(0, index)
.split("\n")
.slice(0, -1)
.join("\n");
var stack = err.stack.split(/\r?\n/);
var line = 0;
var errorMessage;
var stack = err.stack.split(/\r?\n/);
if (stack.length > 0) {
while (
line < stack.length &&
stack[line].indexOf("ReferenceError") !== 0
) {
line++;
}
//store the error in msg to be used in flows
msg.error = err;
if (line < stack.length) {
errorMessage = stack[line];
var m = /:(\d+):(\d+)$/.exec(stack[line + 1]);
if (m) {
var lineno = Number(m[1]) - 1;
var cha = m[2];
errorMessage += " (line " + lineno + ", col " + cha + ")";
}
}
}
if (!errorMessage) {
errorMessage = err.toString();
}
var line = 0;
var errorMessage;
var stack = err.stack.split(/\r?\n/);
if (stack.length > 0) {
while (
line < stack.length &&
stack[line].indexOf("ReferenceError") !== 0
) {
line++;
}
// gives access to the msg object in custom logger
const temp = errorMessage;
errorMessage = msg;
errorMessage.toString = () => temp; // preserve original error message in logs
msg.errorMessage = temp;
if (line < stack.length) {
errorMessage = stack[line];
var m = /:(\d+):(\d+)$/.exec(stack[line + 1]);
if (m) {
var lineno = Number(m[1]) - 1;
var cha = m[2];
errorMessage += " (line " + lineno + ", col " + cha + ")";
}
}
}
if (!errorMessage) {
errorMessage = err.toString();
}
// gives access to the msg object in custom logger
const temp = errorMessage;
errorMessage = msg;
errorMessage.toString = () => temp; // preserve original error message in logs
msg.errorMessage = temp;
this.error(errorMessage, msg);
}
});
this.on("close", function () {
while (node.outstandingTimers.length > 0) {
clearTimeout(node.outstandingTimers.pop());
}
while (node.outstandingIntervals.length > 0) {
clearInterval(node.outstandingIntervals.pop());
}
this.status({});
});
} catch (err) {
// eg SyntaxError - which v8 doesn't include line number information
// so we can't do better than this
this.error(err);
}
}
RED.nodes.registerType("function", FunctionNode);
RED.library.register("functions");
};
this.error(errorMessage, msg);
}
});
this.on("close", function () {
while (node.outstandingTimers.length > 0) {
clearTimeout(node.outstandingTimers.pop());
}
while (node.outstandingIntervals.length > 0) {
clearInterval(node.outstandingIntervals.pop());
}
this.status({});
});
} catch (err) {
// eg SyntaxError - which v8 doesn't include line number information
// so we can't do better than this
this.error(err);
}
}
RED.nodes.registerType("function", FunctionNode);
RED.library.register("functions");
};