Rework start/stop api to use runtime-event notification message

This commit is contained in:
Nick O'Leary
2022-06-29 10:27:44 +01:00
parent 68c1e49f62
commit f33848e16b
11 changed files with 107 additions and 140 deletions

View File

@@ -73,10 +73,6 @@ var api = module.exports = {
if (deploymentType === 'reload') {
apiPromise = runtime.flows.loadFlows(true);
} else {
//ensure the runtime running/stopped state matches the deploying editor. If not, then copy the _rev number to flows.rev
if(flows.hasOwnProperty('_rev') && !flows.hasOwnProperty('rev') && (flows.runtimeState !== "stopped" || runtime.flows.started)) {
flows.rev = flows._rev
}
if (flows.hasOwnProperty('rev')) {
var currentVersion = runtime.flows.getFlows().rev;
if (currentVersion !== flows.rev) {
@@ -271,9 +267,7 @@ var api = module.exports = {
getState: async function(opts) {
runtime.log.audit({event: "flows.getState"}, opts.req);
const result = {
state: runtime.flows.started ? "started" : "stopped",
started: !!runtime.flows.started,
rev: runtime.flows.getFlows().rev
state: runtime.flows.state()
}
return result;
},
@@ -282,7 +276,7 @@ var api = module.exports = {
* @param {Object} opts
* @param {Object} opts.req - the request to log (optional)
* @param {User} opts.user - the user calling the api
* @param {string} opts.requestedState - the requested state. Valid values are "start" and "stop".
* @param {string} opts.state - the requested state. Valid values are "start" and "stop".
* @return {Promise<Flow>} - the active flow configuration
* @memberof @node-red/runtime_flows
*/
@@ -295,7 +289,7 @@ var api = module.exports = {
err.code = err.code || errcode || "unexpected_error"
runtime.log.audit({
event: "flows.setState",
state: opts.requestedState || "",
state: opts.state || "",
error: errcode || "unexpected_error",
message: err.code
}, opts.req);
@@ -304,21 +298,22 @@ var api = module.exports = {
const getState = () => {
return {
state: runtime.flows.started ? "started" : "stopped",
started: !!runtime.flows.started,
rev: runtime.flows.getFlows().rev,
state: runtime.flows.state()
}
}
if(!runtime.settings.runtimeState || runtime.settings.runtimeState.enabled !== true) {
throw (makeError("Method Not Allowed", "not_allowed", 405))
}
switch (opts.requestedState) {
switch (opts.state) {
case "start":
try {
try {
runtime.settings.set('flowsRunStateRequested', opts.requestedState);
} catch(err) { }
runtime.settings.set('runtimeFlowState', opts.state);
} catch(err) {}
if (runtime.settings.safeMode) {
delete runtime.settings.safeMode
}
await runtime.flows.startFlows("full")
return getState()
} catch (err) {
@@ -327,15 +322,15 @@ var api = module.exports = {
case "stop":
try {
try {
runtime.settings.set('flowsRunStateRequested', opts.requestedState);
} catch(err) { }
runtime.settings.set('runtimeFlowState', opts.state);
} catch(err) {}
await runtime.flows.stopFlows("full")
return getState()
} catch (err) {
throw (makeError(err, err.code, 500))
}
default:
throw (makeError(`Cannot change flows runtime state to '${opts.requestedState}'}`, "invalid_run_state", 400))
throw (makeError(`Cannot change flows runtime state to '${opts.state}'}`, "invalid_run_state", 400))
}
},
}

View File

@@ -36,6 +36,8 @@ var activeFlowConfig = null;
var activeFlows = {};
var started = false;
var state = 'stop'
var credentialsPendingReset = false;
var activeNodesToFlow = {};
@@ -50,6 +52,7 @@ function init(runtime) {
storage = runtime.storage;
log = runtime.log;
started = false;
state = 'stop';
if (!typeEventRegistered) {
events.on('type-registered',function(type) {
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
@@ -214,19 +217,26 @@ function setFlows(_config,_credentials,type,muteLog,forceStart,user) {
// Flows are running (or should be)
// Stop the active flows (according to deploy type and the diff)
return stop(type,diff,muteLog).then(() => {
return stop(type,diff,muteLog,true).then(() => {
// Once stopped, allow context to remove anything no longer needed
return context.clean(activeFlowConfig)
}).then(() => {
if (!isLoad) {
log.info(log._("nodes.flows.updated-flows"));
}
// Start the active flows
start(type,diff,muteLog).then(() => {
start(type,diff,muteLog,true).then(() => {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
});
// Return the new revision asynchronously to the actual start
return flowRevision;
}).catch(function(err) { })
} else {
if (!isLoad) {
log.info(log._("nodes.flows.updated-flows"));
}
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
return flowRevision;
}
});
}
@@ -259,10 +269,10 @@ function getFlows() {
return activeConfig;
}
async function start(type,diff,muteLog) {
type = type||"full";
let reallyStarted = started
async function start(type,diff,muteLog,isDeploy) {
type = type || "full";
started = true;
state = 'start'
var i;
// If there are missing types, report them, emit the necessary runtime event and return
if (activeFlowConfig.missingTypes.length > 0) {
@@ -284,7 +294,7 @@ async function start(type,diff,muteLog) {
log.info(log._("nodes.flows.missing-type-install-2"));
log.info(" "+settings.userDir);
}
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
events.emit("runtime-event",{id:"runtime-state",payload:{state: 'stop', error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
return;
}
@@ -298,7 +308,7 @@ async function start(type,diff,muteLog) {
missingModules.push({module:err[i].module.module, error: err[i].error.code || err[i].error.toString()})
log.info(` - ${err[i].module.spec} [${err[i].error.code || "unknown_error"}]`);
}
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-modules", type:"warning",text:"notification.warnings.missing-modules",modules:missingModules},retain:true});
events.emit("runtime-event",{id:"runtime-state",payload:{state: 'stop', error:"missing-modules", type:"warning",text:"notification.warnings.missing-modules",modules:missingModules},retain:true});
return;
}
@@ -307,10 +317,20 @@ async function start(type,diff,muteLog) {
log.info("*****************************************************************")
log.info(log._("nodes.flows.safe-mode"));
log.info("*****************************************************************")
events.emit("runtime-event",{id:"runtime-state",payload:{error:"safe-mode", type:"warning",text:"notification.warnings.safe-mode"},retain:true});
state = 'safe'
events.emit("runtime-event",{id:"runtime-state",payload:{state: 'safe', error:"safe-mode", type:"warning",text:"notification.warnings.safe-mode"},retain:true});
return;
}
const runtimeState = settings.get('runtimeFlowState') || 'start'
if (runtimeState === 'stop') {
log.info(log._("nodes.flows.stopped-flows"));
events.emit("runtime-event",{id:"runtime-state",payload:{ state: 'stop', deploy:isDeploy },retain:true});
state = 'stop'
started = false
return
}
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.starting-modified-"+type));
@@ -365,51 +385,31 @@ async function start(type,diff,muteLog) {
}
}
}
// Having created or updated all flows, now start them.
let startFlows = true
try {
startFlows = settings.get('flowsRunStateRequested');
} catch(err) {
}
startFlows = (startFlows !== "stop");
if (startFlows) {
for (id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) {
try {
activeFlows[id].start(diff);
// Create a map of node id to flow id and also a subflowInstance lookup map
var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id;
});
} catch(err) {
console.log(err.stack);
}
for (id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) {
try {
activeFlows[id].start(diff);
// Create a map of node id to flow id and also a subflowInstance lookup map
var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id;
});
} catch(err) {
console.log(err.stack);
}
}
reallyStarted = true;
events.emit("flows:started", {config: activeConfig, type: type, diff: diff});
// Deprecated event
events.emit("nodes-started");
} else {
started = false;
}
const state = {
started: reallyStarted,
state: reallyStarted ? "started" : "stopped",
}
events.emit("runtime-event",{id:"flows-run-state", payload: state, retain:true});
events.emit("flows:started", {config: activeConfig, type: type, diff: diff});
// Deprecated event
events.emit("nodes-started");
if (credentialsPendingReset === true) {
credentialsPendingReset = false;
} else {
events.emit("runtime-event",{id:"runtime-state",retain:true});
events.emit("runtime-event",{id:"runtime-state", payload:{ state: 'start', deploy:isDeploy}, retain:true});
}
if (!muteLog && reallyStarted) {
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.started-modified-"+type));
} else {
@@ -419,7 +419,7 @@ async function start(type,diff,muteLog) {
return;
}
function stop(type,diff,muteLog) {
function stop(type,diff,muteLog,isDeploy) {
if (!started) {
return Promise.resolve();
}
@@ -439,6 +439,7 @@ function stop(type,diff,muteLog) {
}
}
started = false;
state = 'stop'
var promises = [];
var stopList;
var removedList = diff.removed;
@@ -490,7 +491,8 @@ function stop(type,diff,muteLog) {
}
}
events.emit("flows:stopped",{config: activeConfig, type: type, diff: diff});
events.emit("runtime-event",{id:"flows-run-state", payload: {started: false, state: "stopped"}, retain:true});
events.emit("runtime-event",{ id:"runtime-state", payload:{ state: 'stop', deploy:isDeploy }, retain:true });
// Deprecated event
events.emit("nodes-stopped");
});
@@ -810,7 +812,7 @@ module.exports = {
stopFlows: stop,
get started() { return started },
state: () => { return state },
// handleError: handleError,
// handleStatus: handleStatus,

View File

@@ -215,7 +215,7 @@ function start() {
}
}
return redNodes.loadContextsPlugin().then(function () {
redNodes.loadFlows().then(redNodes.startFlows).catch(function(err) {});
redNodes.loadFlows().then(() => { redNodes.startFlows() }).catch(function(err) {});
started = true;
});
});
@@ -399,12 +399,12 @@ module.exports = {
* @memberof @node-red/runtime
*/
version: externalAPI.version,
/**
* @memberof @node-red/diagnostics
*/
diagnostics:externalAPI.diagnostics,
storage: storage,
events: events,
hooks: hooks,

View File

@@ -122,6 +122,7 @@
"stopped-flows": "Stopped flows",
"stopped": "Stopped",
"stopping-error": "Error stopping node: __message__",
"updated-flows": "Updated flows",
"added-flow": "Adding flow: __label__",
"updated-flow": "Updated flow: __label__",
"removed-flow": "Removed flow: __label__",