pull out editor-client and editor-api

This commit is contained in:
Nick O'Leary
2018-08-17 22:10:54 +01:00
parent 6b79c6135f
commit e57d8ba0ef
287 changed files with 245 additions and 294 deletions

View File

@@ -0,0 +1,291 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var when = require("when");
var redUtil = require("@node-red/util").util;
var Log = require("@node-red/util").log; // TODO: separate module
var context = require("./context");
var flows = require("./flows");
function Node(n) {
this.id = n.id;
this.type = n.type;
this.z = n.z;
this._closeCallbacks = [];
if (n.name) {
this.name = n.name;
}
if (n._alias) {
this._alias = n._alias;
}
this.updateWires(n.wires);
}
util.inherits(Node, EventEmitter);
Node.prototype.updateWires = function(wires) {
//console.log("UPDATE",this.id);
this.wires = wires || [];
delete this._wire;
var wc = 0;
this.wires.forEach(function(w) {
wc+=w.length;
});
this._wireCount = wc;
if (wc === 0) {
// With nothing wired to the node, no-op send
this.send = function(msg) {}
} else {
this.send = Node.prototype.send;
if (this.wires.length === 1 && this.wires[0].length === 1) {
// Single wire, so we can shortcut the send when
// a single message is sent
this._wire = this.wires[0][0];
}
}
}
Node.prototype.context = function() {
if (!this._context) {
this._context = context.get(this._alias||this.id,this.z);
}
return this._context;
}
Node.prototype._on = Node.prototype.on;
Node.prototype.on = function(event, callback) {
var node = this;
if (event == "close") {
this._closeCallbacks.push(callback);
} else {
this._on(event, callback);
}
};
Node.prototype.close = function(removed) {
//console.log(this.type,this.id,removed);
var promises = [];
var node = this;
for (var i=0;i<this._closeCallbacks.length;i++) {
var callback = this._closeCallbacks[i];
if (callback.length > 0) {
promises.push(
when.promise(function(resolve) {
var args = [];
if (callback.length === 2) {
args.push(!!removed);
}
args.push(resolve);
callback.apply(node, args);
})
);
} else {
callback.call(node);
}
}
if (promises.length > 0) {
return when.settle(promises).then(function() {
if (this._context) {
return context.delete(this._alias||this.id,this.z);
}
});
} else {
if (this._context) {
return context.delete(this._alias||this.id,this.z);
}
return;
}
};
Node.prototype.send = function(msg) {
var msgSent = false;
var node;
if (msg === null || typeof msg === "undefined") {
return;
} else if (!util.isArray(msg)) {
if (this._wire) {
// A single message and a single wire on output 0
// TODO: pre-load flows.get calls - cannot do in constructor
// as not all nodes are defined at that point
if (!msg._msgid) {
msg._msgid = redUtil.generateId();
}
this.metric("send",msg);
node = flows.get(this._wire);
/* istanbul ignore else */
if (node) {
node.receive(msg);
}
return;
} else {
msg = [msg];
}
}
var numOutputs = this.wires.length;
// Build a list of send events so that all cloning is done before
// any calls to node.receive
var sendEvents = [];
var sentMessageId = null;
// for each output of node eg. [msgs to output 0, msgs to output 1, ...]
for (var i = 0; i < numOutputs; i++) {
var wires = this.wires[i]; // wires leaving output i
/* istanbul ignore else */
if (i < msg.length) {
var msgs = msg[i]; // msgs going to output i
if (msgs !== null && typeof msgs !== "undefined") {
if (!util.isArray(msgs)) {
msgs = [msgs];
}
var k = 0;
// for each recipent node of that output
for (var j = 0; j < wires.length; j++) {
node = flows.get(wires[j]); // node at end of wire j
if (node) {
// for each msg to send eg. [[m1, m2, ...], ...]
for (k = 0; k < msgs.length; k++) {
var m = msgs[k];
if (m !== null && m !== undefined) {
/* istanbul ignore else */
if (!sentMessageId) {
sentMessageId = m._msgid;
}
if (msgSent) {
var clonedmsg = redUtil.cloneMessage(m);
sendEvents.push({n:node,m:clonedmsg});
} else {
sendEvents.push({n:node,m:m});
msgSent = true;
}
}
}
}
}
}
}
}
/* istanbul ignore else */
if (!sentMessageId) {
sentMessageId = redUtil.generateId();
}
this.metric("send",{_msgid:sentMessageId});
for (i=0;i<sendEvents.length;i++) {
var ev = sendEvents[i];
/* istanbul ignore else */
if (!ev.m._msgid) {
ev.m._msgid = sentMessageId;
}
ev.n.receive(ev.m);
}
};
Node.prototype.receive = function(msg) {
if (!msg) {
msg = {};
}
if (!msg._msgid) {
msg._msgid = redUtil.generateId();
}
this.metric("receive",msg);
try {
this.emit("input", msg);
} catch(err) {
this.error(err,msg);
}
};
function log_helper(self, level, msg) {
var o = {
level: level,
id: self.id,
type: self.type,
msg: msg
};
if (self._alias) {
o._alias = self._alias;
}
if (self.z) {
o.z = self.z;
}
if (self.name) {
o.name = self.name;
}
Log.log(o);
}
Node.prototype.log = function(msg) {
log_helper(this, Log.INFO, msg);
};
Node.prototype.warn = function(msg) {
log_helper(this, Log.WARN, msg);
};
Node.prototype.error = function(logMessage,msg) {
if (typeof logMessage != 'boolean') {
logMessage = logMessage || "";
}
var handled = false;
if (msg) {
handled = flows.handleError(this,logMessage,msg);
}
if (!handled) {
log_helper(this, Log.ERROR, logMessage);
}
};
Node.prototype.debug = function(msg) {
log_helper(this, Log.DEBUG, msg);
}
Node.prototype.trace = function(msg) {
log_helper(this, Log.TRACE, msg);
}
/**
* If called with no args, returns whether metric collection is enabled
*/
Node.prototype.metric = function(eventname, msg, metricValue) {
if (typeof eventname === "undefined") {
return Log.metric();
}
var metrics = {};
metrics.level = Log.METRIC;
metrics.nodeid = this.id;
metrics.event = "node."+this.type+"."+eventname;
metrics.msgid = msg._msgid;
metrics.value = metricValue;
Log.log(metrics);
}
/**
* status: { fill:"red|green", shape:"dot|ring", text:"blah" }
*/
Node.prototype.status = function(status) {
flows.handleStatus(this,status);
};
module.exports = Node;

View File

@@ -0,0 +1,384 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var clone = require("clone");
var log = require("@node-red/util").log; // TODO: separate module
var memory = require("./memory");
var settings;
// A map of scope id to context instance
var contexts = {};
// A map of store name to instance
var stores = {};
var storeList = [];
var defaultStore;
// Whether there context storage has been configured or left as default
var hasConfiguredStore = false;
// Unknown Stores
var unknownStores = {};
function logUnknownStore(name) {
if (name) {
var count = unknownStores[name] || 0;
if (count == 0) {
log.warn(log._("context.unknown-store", {name: name}));
count++;
unknownStores[name] = count;
}
}
}
function init(_settings) {
settings = _settings;
contexts = {};
stores = {};
storeList = [];
hasConfiguredStore = false;
var seed = settings.functionGlobalContext || {};
contexts['global'] = createContext("global",seed);
// create a default memory store - used by the unit tests that skip the full
// `load()` initialisation sequence.
// If the user has any stores configured, this will be disgarded
stores["_"] = new memory();
defaultStore = "memory";
}
function load() {
return new Promise(function(resolve,reject) {
// load & init plugins in settings.contextStorage
var plugins = settings.contextStorage || {};
var defaultIsAlias = false;
var promises = [];
if (plugins && Object.keys(plugins).length > 0) {
var hasDefault = plugins.hasOwnProperty('default');
var defaultName;
for (var pluginName in plugins) {
if (plugins.hasOwnProperty(pluginName)) {
// "_" is a reserved name - do not allow it to be overridden
if (pluginName === "_") {
continue;
}
if (!/^[a-zA-Z0-9_]+$/.test(pluginName)) {
return reject(new Error(log._("context.error-invalid-module-name", {name:pluginName})));
}
// Check if this is setting the 'default' context to be a named plugin
if (pluginName === "default" && typeof plugins[pluginName] === "string") {
// Check the 'default' alias exists before initialising anything
if (!plugins.hasOwnProperty(plugins[pluginName])) {
return reject(new Error(log._("context.error-invalid-default-module", {storage:plugins["default"]})));
}
defaultIsAlias = true;
continue;
}
if (!hasDefault && !defaultName) {
defaultName = pluginName;
}
var plugin;
if (plugins[pluginName].hasOwnProperty("module")) {
// Get the provided config and copy in the 'approved' top-level settings (eg userDir)
var config = plugins[pluginName].config || {};
copySettings(config, settings);
if (typeof plugins[pluginName].module === "string") {
// This config identifies the module by name - assume it is a built-in one
// TODO: check it exists locally, if not, try to require it as-is
try {
plugin = require("./"+plugins[pluginName].module);
} catch(err) {
return reject(new Error(log._("context.error-loading-module", {module:plugins[pluginName].module,message:err.toString()})));
}
} else {
// Assume `module` is an already-required module we can use
plugin = plugins[pluginName].module;
}
try {
// Create a new instance of the plugin by calling its module function
stores[pluginName] = plugin(config);
var moduleInfo = plugins[pluginName].module;
if (typeof moduleInfo !== 'string') {
if (moduleInfo.hasOwnProperty("toString")) {
moduleInfo = moduleInfo.toString();
} else {
moduleInfo = "custom";
}
}
log.info(log._("context.log-store-init", {name:pluginName, info:"module="+moduleInfo}));
} catch(err) {
return reject(new Error(log._("context.error-loading-module",{module:pluginName,message:err.toString()})));
}
} else {
// Plugin does not specify a 'module'
return reject(new Error(log._("context.error-module-not-defined", {storage:pluginName})));
}
}
}
// Open all of the configured contexts
for (var plugin in stores) {
if (stores.hasOwnProperty(plugin)) {
promises.push(stores[plugin].open());
}
}
// There is a 'default' listed in the configuration
if (hasDefault) {
// If 'default' is an alias, point it at the right module - we have already
// checked that it exists. If it isn't an alias, then it will
// already be set to a configured store
if (defaultIsAlias) {
stores["_"] = stores[plugins["default"]];
defaultStore = plugins["default"];
} else {
stores["_"] = stores["default"];
defaultStore = "default";
}
} else if (defaultName) {
// No 'default' listed, so pick first in list as the default
stores["_"] = stores[defaultName];
defaultStore = defaultName;
defaultIsAlias = true;
} else {
// else there were no stores list the config object - fall through
// to below where we default to a memory store
storeList = ["memory"];
defaultStore = "memory";
}
hasConfiguredStore = true;
storeList = Object.keys(stores).filter(n=>!(defaultIsAlias && n==="default") && n!== "_");
} else {
// No configured plugins
log.info(log._("context.log-store-init", {name:"default", info:"module=memory"}));
promises.push(stores["_"].open())
storeList = ["memory"];
defaultStore = "memory";
}
return resolve(Promise.all(promises));
}).catch(function(err) {
throw new Error(log._("context.error-loading-module",{message:err.toString()}));
});
}
function copySettings(config, settings){
var copy = ["userDir"]
config.settings = {};
copy.forEach(function(setting){
config.settings[setting] = clone(settings[setting]);
});
}
function getContextStorage(storage) {
if (stores.hasOwnProperty(storage)) {
// A known context
return stores[storage];
} else if (stores.hasOwnProperty("_")) {
// Not known, but we have a default to fall back to
if (storage !== defaultStore) {
// It isn't the default store either, so log it
logUnknownStore(storage);
}
return stores["_"];
}
}
function createContext(id,seed) {
// Seed is only set for global context - sourced from functionGlobalContext
var scope = id;
var obj = seed || {};
var seedKeys;
var insertSeedValues;
if (seed) {
seedKeys = Object.keys(seed);
insertSeedValues = function(keys,values) {
if (!Array.isArray(keys)) {
if (values[0] === undefined) {
values[0] = seed[keys];
}
} else {
for (var i=0;i<keys.length;i++) {
if (values[i] === undefined) {
values[i] = seed[keys[i]];
}
}
}
}
}
obj.get = function(key, storage, callback) {
var context;
if (!storage && !callback) {
context = stores["_"];
} else {
if (typeof storage === 'function') {
callback = storage;
storage = "_";
}
if (callback && typeof callback !== 'function'){
throw new Error("Callback must be a function");
}
context = getContextStorage(storage);
}
if (callback) {
if (!seed) {
context.get(scope,key,callback);
} else {
context.get(scope,key,function() {
if (arguments[0]) {
callback(arguments[0]);
return;
}
var results = Array.prototype.slice.call(arguments,[1]);
insertSeedValues(key,results);
// Put the err arg back
results.unshift(undefined);
callback.apply(null,results);
});
}
} else {
// No callback, attempt to do this synchronously
var results = context.get(scope,key);
if (seed) {
if (Array.isArray(key)) {
insertSeedValues(key,results);
} else if (results === undefined){
results = seed[key];
}
}
return results;
}
};
obj.set = function(key, value, storage, callback) {
var context;
if (!storage && !callback) {
context = stores["_"];
} else {
if (typeof storage === 'function') {
callback = storage;
storage = "_";
}
if (callback && typeof callback !== 'function') {
throw new Error("Callback must be a function");
}
context = getContextStorage(storage);
}
context.set(scope, key, value, callback);
};
obj.keys = function(storage, callback) {
var context;
if (!storage && !callback) {
context = stores["_"];
} else {
if (typeof storage === 'function') {
callback = storage;
storage = "_";
}
if (callback && typeof callback !== 'function') {
throw new Error("Callback must be a function");
}
context = getContextStorage(storage);
}
if (seed) {
if (callback) {
context.keys(scope, function(err,keys) {
callback(err,Array.from(new Set(seedKeys.concat(keys)).keys()));
});
} else {
var keys = context.keys(scope);
return Array.from(new Set(seedKeys.concat(keys)).keys())
}
} else {
return context.keys(scope, callback);
}
};
return obj;
}
function getContext(localId,flowId) {
var contextId = localId;
if (flowId) {
contextId = localId+":"+flowId;
}
if (contexts.hasOwnProperty(contextId)) {
return contexts[contextId];
}
var newContext = createContext(contextId);
if (flowId) {
newContext.flow = getContext(flowId);
}
newContext.global = contexts['global'];
contexts[contextId] = newContext;
return newContext;
}
function deleteContext(id,flowId) {
if(!hasConfiguredStore){
// only delete context if there's no configured storage.
var contextId = id;
if (flowId) {
contextId = id+":"+flowId;
}
delete contexts[contextId];
return stores["_"].delete(contextId);
}else{
return Promise.resolve();
}
}
function clean(flowConfig) {
var promises = [];
for(var plugin in stores){
if(stores.hasOwnProperty(plugin)){
promises.push(stores[plugin].clean(Object.keys(flowConfig.allNodes)));
}
}
for (var id in contexts) {
if (contexts.hasOwnProperty(id) && id !== "global") {
var idParts = id.split(":");
if (!flowConfig.allNodes.hasOwnProperty(idParts[0])) {
delete contexts[id];
}
}
}
return Promise.all(promises);
}
function close() {
var promises = [];
for(var plugin in stores){
if(stores.hasOwnProperty(plugin)){
promises.push(stores[plugin].close());
}
}
return Promise.all(promises);
}
function listStores() {
return {default:defaultStore,stores:storeList};
}
module.exports = {
init: init,
load: load,
listStores: listStores,
get: getContext,
delete: deleteContext,
clean: clean,
close: close
};

View File

@@ -0,0 +1,377 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
/**
* Local file-system based context storage
*
* Configuration options:
* {
* base: "context", // the base directory to use
* // default: "context"
* dir: "/path/to/storage", // the directory to create the base directory in
* // default: settings.userDir
* cache: true, // whether to cache contents in memory
* // default: true
* flushInterval: 30 // if cache is enabled, the minimum interval
* // between writes to storage, in seconds. This
* can be used to reduce wear on underlying storage.
* default: 30 seconds
* }
*
*
* $HOME/.node-red/contexts
* ├── global
* │ └── global_context.json
* ├── <id of Flow 1>
* │ ├── flow_context.json
* │ ├── <id of Node a>.json
* │ └── <id of Node b>.json
* └── <id of Flow 2>
* ├── flow_context.json
* ├── <id of Node x>.json
* └── <id of Node y>.json
*/
var fs = require('fs-extra');
var path = require("path");
var util = require("@node-red/util").util;
var log = require("@node-red/util").log;
var safeJSONStringify = require("json-stringify-safe");
var MemoryStore = require("./memory");
function getStoragePath(storageBaseDir, scope) {
if(scope.indexOf(":") === -1){
if(scope === "global"){
return path.join(storageBaseDir,"global",scope);
}else{ // scope:flow
return path.join(storageBaseDir,scope,"flow");
}
}else{ // scope:local
var ids = scope.split(":")
return path.join(storageBaseDir,ids[1],ids[0]);
}
}
function getBasePath(config) {
var base = config.base || "context";
var storageBaseDir;
if (!config.dir) {
if(config.settings && config.settings.userDir){
storageBaseDir = path.join(config.settings.userDir, base);
}else{
try {
fs.statSync(path.join(process.env.NODE_RED_HOME,".config.json"));
storageBaseDir = path.join(process.env.NODE_RED_HOME, base);
} catch(err) {
try {
// Consider compatibility for older versions
if (process.env.HOMEPATH) {
fs.statSync(path.join(process.env.HOMEPATH,".node-red",".config.json"));
storageBaseDir = path.join(process.env.HOMEPATH, ".node-red", base);
}
} catch(err) {
}
if (!storageBaseDir) {
storageBaseDir = path.join(process.env.HOME || process.env.USERPROFILE || process.env.HOMEPATH || process.env.NODE_RED_HOME,".node-red", base);
}
}
}
}else{
storageBaseDir = path.join(config.dir, base);
}
return storageBaseDir;
}
function loadFile(storagePath){
return fs.pathExists(storagePath).then(function(exists){
if(exists === true){
return fs.readFile(storagePath, "utf8");
}else{
return Promise.resolve(undefined);
}
});
}
function listFiles(storagePath) {
var promises = [];
return fs.readdir(storagePath).then(function(files) {
files.forEach(function(file) {
if (!/^\./.test(file)) {
var fullPath = path.join(storagePath,file);
var stats = fs.statSync(fullPath);
if (stats.isDirectory()) {
promises.push(fs.readdir(fullPath).then(function(subdirFiles) {
var result = [];
subdirFiles.forEach(subfile => {
if (/\.json$/.test(subfile)) {
result.push(path.join(file,subfile))
}
});
return result;
}))
}
}
});
return Promise.all(promises);
}).then(dirs => dirs.reduce((acc, val) => acc.concat(val), []));
}
function stringify(value) {
var hasCircular;
var result = safeJSONStringify(value,null,4,function(k,v){hasCircular = true})
return { json: result, circular: hasCircular };
}
function LocalFileSystem(config){
this.config = config;
this.storageBaseDir = getBasePath(this.config);
if (config.hasOwnProperty('cache')?config.cache:true) {
this.cache = MemoryStore({});
}
this.pendingWrites = {};
this.knownCircularRefs = {};
if (config.hasOwnProperty('flushInterval')) {
this.flushInterval = Math.max(0,config.flushInterval) * 1000;
} else {
this.flushInterval = 30000;
}
}
LocalFileSystem.prototype.open = function(){
var self = this;
if (this.cache) {
var scopes = [];
var promises = [];
return listFiles(self.storageBaseDir).then(function(files) {
files.forEach(function(file) {
var parts = file.split(path.sep);
if (parts[0] === 'global') {
scopes.push("global");
} else if (parts[1] === 'flow.json') {
scopes.push(parts[0])
} else {
scopes.push(parts[1].substring(0,parts[1].length-5)+":"+parts[0]);
}
promises.push(loadFile(path.join(self.storageBaseDir,file)));
})
return Promise.all(promises);
}).then(function(res) {
scopes.forEach(function(scope,i) {
var data = res[i]?JSON.parse(res[i]):{};
Object.keys(data).forEach(function(key) {
self.cache.set(scope,key,data[key]);
})
});
}).catch(function(err){
if(err.code == 'ENOENT') {
return fs.ensureDir(self.storageBaseDir);
}else{
throw err;
}
}).then(function() {
self._flushPendingWrites = function() {
var scopes = Object.keys(self.pendingWrites);
self.pendingWrites = {};
var promises = [];
var newContext = self.cache._export();
scopes.forEach(function(scope) {
var storagePath = getStoragePath(self.storageBaseDir,scope);
var context = newContext[scope];
var stringifiedContext = stringify(context);
if (stringifiedContext.circular && !self.knownCircularRefs[scope]) {
log.warn(log._("error-circular",{scope:scope}));
self.knownCircularRefs[scope] = true;
} else {
delete self.knownCircularRefs[scope];
}
log.debug("Flushing localfilesystem context scope "+scope);
promises.push(fs.outputFile(storagePath + ".json", stringifiedContext.json, "utf8"));
});
delete self._pendingWriteTimeout;
return Promise.all(promises);
}
});
} else {
return fs.ensureDir(self.storageBaseDir);
}
}
LocalFileSystem.prototype.close = function(){
if (this.cache && this._flushPendingWrites) {
clearTimeout(this._pendingWriteTimeout);
delete this._pendingWriteTimeout;
return this._flushPendingWrites();
}
return Promise.resolve();
}
LocalFileSystem.prototype.get = function(scope, key, callback) {
if (this.cache) {
return this.cache.get(scope,key,callback);
}
if(typeof callback !== "function"){
throw new Error("Callback must be a function");
}
var storagePath = getStoragePath(this.storageBaseDir ,scope);
loadFile(storagePath + ".json").then(function(data){
if(data){
data = JSON.parse(data);
if (!Array.isArray(key)) {
callback(null, util.getObjectProperty(data,key));
} else {
var results = [undefined];
for (var i=0;i<key.length;i++) {
results.push(util.getObjectProperty(data,key[i]))
}
callback.apply(null,results);
}
}else{
callback(null, undefined);
}
}).catch(function(err){
callback(err);
});
};
LocalFileSystem.prototype.set = function(scope, key, value, callback) {
var self = this;
var storagePath = getStoragePath(this.storageBaseDir ,scope);
if (this.cache) {
this.cache.set(scope,key,value,callback);
this.pendingWrites[scope] = true;
if (this._pendingWriteTimeout) {
// there's a pending write which will handle this
return;
} else {
this._pendingWriteTimeout = setTimeout(function() {
self._flushPendingWrites.call(self).catch(function(err) {
log.error(log._("context.localfilesystem.error-write",{message:err.toString()}))
});
}, this.flushInterval);
}
} else if (callback && typeof callback !== 'function') {
throw new Error("Callback must be a function");
} else {
loadFile(storagePath + ".json").then(function(data){
var obj = data ? JSON.parse(data) : {}
if (!Array.isArray(key)) {
key = [key];
value = [value];
} else if (!Array.isArray(value)) {
// key is an array, but value is not - wrap it as an array
value = [value];
}
for (var i=0;i<key.length;i++) {
var v = null;
if (i<value.length) {
v = value[i];
}
util.setObjectProperty(obj,key[i],v);
}
var stringifiedContext = stringify(obj);
if (stringifiedContext.circular && !self.knownCircularRefs[scope]) {
log.warn(log._("error-circular",{scope:scope}));
self.knownCircularRefs[scope] = true;
} else {
delete self.knownCircularRefs[scope];
}
return fs.outputFile(storagePath + ".json", stringifiedContext.json, "utf8");
}).then(function(){
if(typeof callback === "function"){
callback(null);
}
}).catch(function(err){
if(typeof callback === "function"){
callback(err);
}
});
}
};
LocalFileSystem.prototype.keys = function(scope, callback){
if (this.cache) {
return this.cache.keys(scope,callback);
}
if(typeof callback !== "function"){
throw new Error("Callback must be a function");
}
var storagePath = getStoragePath(this.storageBaseDir ,scope);
loadFile(storagePath + ".json").then(function(data){
if(data){
callback(null, Object.keys(JSON.parse(data)));
}else{
callback(null, []);
}
}).catch(function(err){
callback(err);
});
};
LocalFileSystem.prototype.delete = function(scope){
var cachePromise;
if (this.cache) {
cachePromise = this.cache.delete(scope);
} else {
cachePromise = Promise.resolve();
}
var that = this;
delete this.pendingWrites[scope];
return cachePromise.then(function() {
var storagePath = getStoragePath(that.storageBaseDir,scope);
return fs.remove(storagePath + ".json");
});
}
LocalFileSystem.prototype.clean = function(_activeNodes) {
var activeNodes = {};
_activeNodes.forEach(function(node) { activeNodes[node] = true });
var self = this;
var cachePromise;
if (this.cache) {
cachePromise = this.cache.clean(_activeNodes);
} else {
cachePromise = Promise.resolve();
}
this.knownCircularRefs = {};
return cachePromise.then(() => listFiles(self.storageBaseDir)).then(function(files) {
var promises = [];
files.forEach(function(file) {
var parts = file.split(path.sep);
var removePromise;
if (parts[0] === 'global') {
// never clean global
return;
} else if (!activeNodes[parts[0]]) {
// Flow removed - remove the whole dir
removePromise = fs.remove(path.join(self.storageBaseDir,parts[0]));
} else if (parts[1] !== 'flow.json' && !activeNodes[parts[1].substring(0,parts[1].length-5)]) {
// Node removed - remove the context file
removePromise = fs.remove(path.join(self.storageBaseDir,file));
}
if (removePromise) {
promises.push(removePromise);
}
});
return Promise.all(promises)
})
}
module.exports = function(config){
return new LocalFileSystem(config);
};

View File

@@ -0,0 +1,166 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var util = require("@node-red/util").util;
function Memory(config){
this.data = {};
}
Memory.prototype.open = function(){
return Promise.resolve();
};
Memory.prototype.close = function(){
return Promise.resolve();
};
Memory.prototype._getOne = function(scope, key) {
var value;
var error;
if(this.data[scope]){
value = util.getObjectProperty(this.data[scope], key);
}
return value;
}
Memory.prototype.get = function(scope, key, callback) {
var value;
var error;
if (!Array.isArray(key)) {
try {
value = this._getOne(scope,key);
} catch(err) {
if (!callback) {
throw err;
}
error = err;
}
if (callback) {
callback(error,value);
return;
} else {
return value;
}
}
value = [];
for (var i=0; i<key.length; i++) {
try {
value.push(this._getOne(scope,key[i]));
} catch(err) {
if (!callback) {
throw err;
} else {
callback(err);
return;
}
}
}
if (callback) {
callback.apply(null, [undefined].concat(value));
} else {
return value;
}
};
Memory.prototype.set = function(scope, key, value, callback) {
if(!this.data[scope]){
this.data[scope] = {};
}
var error;
if (!Array.isArray(key)) {
key = [key];
value = [value];
} else if (!Array.isArray(value)) {
// key is an array, but value is not - wrap it as an array
value = [value];
}
try {
for (var i=0; i<key.length; i++) {
var v = null;
if (i < value.length) {
v = value[i];
}
util.setObjectProperty(this.data[scope],key[i],v);
}
} catch(err) {
if (callback) {
error = err;
} else {
throw err;
}
}
if(callback){
callback(error);
}
};
Memory.prototype.keys = function(scope, callback){
var values = [];
var error;
try{
if(this.data[scope]){
if (scope !== "global") {
values = Object.keys(this.data[scope]);
} else {
values = Object.keys(this.data[scope]).filter(function (key) {
return key !== "set" && key !== "get" && key !== "keys";
});
}
}
}catch(err){
if(callback){
error = err;
}else{
throw err;
}
}
if(callback){
if(error){
callback(error);
} else {
callback(null, values);
}
} else {
return values;
}
};
Memory.prototype.delete = function(scope){
delete this.data[scope];
return Promise.resolve();
};
Memory.prototype.clean = function(activeNodes){
for(var id in this.data){
if(this.data.hasOwnProperty(id) && id !== "global"){
var idParts = id.split(":");
if(activeNodes.indexOf(idParts[0]) === -1){
delete this.data[id];
}
}
}
return Promise.resolve();
}
Memory.prototype._export = function() {
return this.data;
}
module.exports = function(config){
return new Memory(config);
};

View File

@@ -0,0 +1,432 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var when = require("when");
var crypto = require('crypto');
var runtime;
var settings;
var log;
var encryptedCredentials = null;
var credentialCache = {};
var credentialsDef = {};
var dirty = false;
var removeDefaultKey = false;
var encryptionEnabled = null;
var encryptionKeyType; // disabled, system, user, project
var encryptionAlgorithm = "aes-256-ctr";
var encryptionKey;
function decryptCredentials(key,credentials) {
var creds = credentials["$"];
var initVector = new Buffer(creds.substring(0, 32),'hex');
creds = creds.substring(32);
var decipher = crypto.createDecipheriv(encryptionAlgorithm, key, initVector);
var decrypted = decipher.update(creds, 'base64', 'utf8') + decipher.final('utf8');
return JSON.parse(decrypted);
}
function encryptCredentials(key,credentials) {
var initVector = crypto.randomBytes(16);
var cipher = crypto.createCipheriv(encryptionAlgorithm, key, initVector);
return {"$":initVector.toString('hex') + cipher.update(JSON.stringify(credentials), 'utf8', 'base64') + cipher.final('base64')};
}
var api = module.exports = {
init: function(_runtime) {
runtime = _runtime;
log = runtime.log;
settings = runtime.settings;
dirty = false;
credentialCache = {};
credentialsDef = {};
encryptionEnabled = null;
},
/**
* Sets the credentials from storage.
*/
load: function (credentials) {
dirty = false;
var credentialsEncrypted = credentials.hasOwnProperty("$") && Object.keys(credentials).length === 1;
// Case 1: Active Project in place
// - use whatever its config says
// Case 2: _credentialSecret unset, credentialSecret unset
// - generate _credentialSecret and encrypt
// Case 3: _credentialSecret set, credentialSecret set
// - migrate from _credentialSecret to credentialSecret
// - delete _credentialSecret
// Case 4: credentialSecret set
// - use it
var setupEncryptionPromise = when.resolve();
var projectKey = false;
var activeProject;
encryptionKeyType = "";
if (runtime.storage && runtime.storage.projects) {
// projects enabled
activeProject = runtime.storage.projects.getActiveProject();
if (activeProject) {
projectKey = activeProject.credentialSecret;
if (!projectKey) {
log.debug("red/runtime/nodes/credentials.load : using active project key - disabled");
encryptionKeyType = "disabled";
encryptionEnabled = false;
} else {
log.debug("red/runtime/nodes/credentials.load : using active project key");
encryptionKeyType = "project";
encryptionKey = crypto.createHash('sha256').update(projectKey).digest();
encryptionEnabled = true;
}
}
}
if (encryptionKeyType === '') {
var defaultKey;
try {
defaultKey = settings.get('_credentialSecret');
} catch(err) {
}
if (defaultKey) {
defaultKey = crypto.createHash('sha256').update(defaultKey).digest();
encryptionKeyType = "system";
}
var userKey;
try {
userKey = settings.get('credentialSecret');
} catch(err) {
userKey = false;
}
if (userKey === false) {
encryptionKeyType = "disabled";
log.debug("red/runtime/nodes/credentials.load : user disabled encryption");
// User has disabled encryption
encryptionEnabled = false;
// Check if we have a generated _credSecret to decrypt with and remove
if (defaultKey) {
log.debug("red/runtime/nodes/credentials.load : default key present. Will migrate");
if (credentialsEncrypted) {
try {
credentials = decryptCredentials(defaultKey,credentials)
} catch(err) {
credentials = {};
log.warn(log._("nodes.credentials.error",{message:err.toString()}))
var error = new Error("Failed to decrypt credentials");
error.code = "credentials_load_failed";
return when.reject(error);
}
}
dirty = true;
removeDefaultKey = true;
}
} else if (typeof userKey === 'string') {
if (!projectKey) {
log.debug("red/runtime/nodes/credentials.load : user provided key");
}
if (encryptionKeyType !== 'project') {
encryptionKeyType = 'user';
}
// User has provided own encryption key, get the 32-byte hash of it
encryptionKey = crypto.createHash('sha256').update(userKey).digest();
encryptionEnabled = true;
if (encryptionKeyType !== 'project' && defaultKey) {
log.debug("red/runtime/nodes/credentials.load : default key present. Will migrate");
// User has provided their own key, but we already have a default key
// Decrypt using default key
if (credentialsEncrypted) {
try {
credentials = decryptCredentials(defaultKey,credentials)
} catch(err) {
credentials = {};
log.warn(log._("nodes.credentials.error",{message:err.toString()}))
var error = new Error("Failed to decrypt credentials");
error.code = "credentials_load_failed";
return when.reject(error);
}
}
dirty = true;
removeDefaultKey = true;
}
} else {
log.debug("red/runtime/nodes/credentials.load : no user key present");
// User has not provide their own key
if (encryptionKeyType !== 'project') {
encryptionKeyType = 'system';
}
encryptionKey = defaultKey;
encryptionEnabled = true;
if (encryptionKey === undefined) {
log.debug("red/runtime/nodes/credentials.load : no default key present - generating one");
// No user-provided key, no generated key
// Generate a new key
defaultKey = crypto.randomBytes(32).toString('hex');
try {
setupEncryptionPromise = settings.set('_credentialSecret',defaultKey);
encryptionKey = crypto.createHash('sha256').update(defaultKey).digest();
} catch(err) {
log.debug("red/runtime/nodes/credentials.load : settings unavailable - disabling encryption");
// Settings unavailable
encryptionEnabled = false;
encryptionKey = null;
}
dirty = true;
} else {
log.debug("red/runtime/nodes/credentials.load : using default key");
}
}
}
if (encryptionEnabled && !dirty) {
encryptedCredentials = credentials;
}
log.debug("red/runtime/nodes/credentials.load : keyType="+encryptionKeyType);
if (encryptionKeyType === 'system') {
log.warn(log._("nodes.credentials.system-key-warning"));
}
return setupEncryptionPromise.then(function() {
var clearInvalidFlag = false;
if (credentials.hasOwnProperty("$")) {
if (encryptionEnabled === false) {
// The credentials appear to be encrypted, but our config
// thinks they are not.
var error = new Error("Failed to decrypt credentials");
error.code = "credentials_load_failed";
if (activeProject) {
// This is a project with a bad key. Mark it as invalid
// TODO: this delves too deep into Project structure
activeProject.credentialSecretInvalid = true;
return when.reject(error);
}
return when.reject(error);
}
// These are encrypted credentials
try {
credentialCache = decryptCredentials(encryptionKey,credentials)
clearInvalidFlag = true;
} catch(err) {
credentialCache = {};
dirty = true;
log.warn(log._("nodes.credentials.error",{message:err.toString()}))
var error = new Error("Failed to decrypt credentials");
error.code = "credentials_load_failed";
if (activeProject) {
// This is a project with a bad key. Mark it as invalid
// TODO: this delves too deep into Project structure
activeProject.credentialSecretInvalid = true;
return when.reject(error);
}
return when.reject(error);
}
} else {
credentialCache = credentials;
}
if (clearInvalidFlag) {
// TODO: this delves too deep into Project structure
if (activeProject) {
delete activeProject.credentialSecretInvalid;
}
}
});
},
/**
* Adds a set of credentials for the given node id.
* @param id the node id for the credentials
* @param creds an object of credential key/value pairs
* @return a promise for backwards compatibility TODO: can this be removed?
*/
add: function (id, creds) {
if (!credentialCache.hasOwnProperty(id) || JSON.stringify(creds) !== JSON.stringify(credentialCache[id])) {
credentialCache[id] = creds;
dirty = true;
}
return when.resolve();
},
/**
* Gets the credentials for the given node id.
* @param id the node id for the credentials
* @return the credentials
*/
get: function (id) {
return credentialCache[id];
},
/**
* Deletes the credentials for the given node id.
* @param id the node id for the credentials
* @return a promise for the saving of credentials to storage
*/
delete: function (id) {
delete credentialCache[id];
dirty = true;
},
clear: function() {
credentialCache = {};
dirty = true;
},
/**
* Deletes any credentials for nodes that no longer exist
* @param config a flow config
* @return a promise for the saving of credentials to storage
*/
clean: function (config) {
var existingIds = {};
config.forEach(function(n) {
existingIds[n.id] = true;
if (n.credentials) {
api.extract(n);
}
});
var deletedCredentials = false;
for (var c in credentialCache) {
if (credentialCache.hasOwnProperty(c)) {
if (!existingIds[c]) {
deletedCredentials = true;
delete credentialCache[c];
}
}
}
if (deletedCredentials) {
dirty = true;
}
return when.resolve();
},
/**
* Registers a node credential definition.
* @param type the node type
* @param definition the credential definition
*/
register: function (type, definition) {
var dashedType = type.replace(/\s+/g, '-');
credentialsDef[dashedType] = definition;
},
/**
* Extracts and stores any credential updates in the provided node.
* The provided node may have a .credentials property that contains
* new credentials for the node.
* This function loops through the credentials in the definition for
* the node-type and applies any of the updates provided in the node.
*
* This function does not save the credentials to disk as it is expected
* to be called multiple times when a new flow is deployed.
*
* @param node the node to extract credentials from
*/
extract: function(node) {
var nodeID = node.id;
var nodeType = node.type;
var newCreds = node.credentials;
if (newCreds) {
delete node.credentials;
var savedCredentials = credentialCache[nodeID] || {};
var dashedType = nodeType.replace(/\s+/g, '-');
var definition = credentialsDef[dashedType];
if (!definition) {
log.warn(log._("nodes.credentials.not-registered",{type:nodeType}));
return;
}
for (var cred in definition) {
if (definition.hasOwnProperty(cred)) {
if (newCreds[cred] === undefined) {
continue;
}
if (definition[cred].type == "password" && newCreds[cred] == '__PWRD__') {
continue;
}
if (0 === newCreds[cred].length || /^\s*$/.test(newCreds[cred])) {
delete savedCredentials[cred];
dirty = true;
continue;
}
if (!savedCredentials.hasOwnProperty(cred) || JSON.stringify(savedCredentials[cred]) !== JSON.stringify(newCreds[cred])) {
savedCredentials[cred] = newCreds[cred];
dirty = true;
}
}
}
credentialCache[nodeID] = savedCredentials;
}
},
/**
* Gets the credential definition for the given node type
* @param type the node type
* @return the credential definition
*/
getDefinition: function (type) {
return credentialsDef[type];
},
dirty: function() {
return dirty;
},
setKey: function(key) {
if (key) {
encryptionKey = crypto.createHash('sha256').update(key).digest();
encryptionEnabled = true;
dirty = true;
encryptionKeyType = "project";
} else {
encryptionKey = null;
encryptionEnabled = false;
dirty = true;
encryptionKeyType = "disabled";
}
},
getKeyType: function() {
return encryptionKeyType;
},
export: function() {
var result = credentialCache;
if (encryptionEnabled) {
if (dirty) {
try {
log.debug("red/runtime/nodes/credentials.export : encrypting");
result = encryptCredentials(encryptionKey, credentialCache);
} catch(err) {
log.warn(log._("nodes.credentials.error-saving",{message:err.toString()}))
}
} else {
result = encryptedCredentials;
}
}
dirty = false;
if (removeDefaultKey) {
log.debug("red/runtime/nodes/credentials.export : removing unused default key");
return settings.delete('_credentialSecret').then(function() {
removeDefaultKey = false;
return result;
})
} else {
return when.resolve(result);
}
}
}

View File

@@ -0,0 +1,511 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var when = require("when");
var clone = require("clone");
var typeRegistry = require("@node-red/registry");
var Log;
var redUtil = require("@node-red/util").util;
var flowUtil = require("./util");
var Node;
var nodeCloseTimeout = 15000;
function Flow(global,flow) {
if (typeof flow === 'undefined') {
flow = global;
}
var activeNodes = {};
var subflowInstanceNodes = {};
var catchNodeMap = {};
var statusNodeMap = {};
this.start = function(diff) {
var node;
var newNode;
var id;
catchNodeMap = {};
statusNodeMap = {};
var configNodes = Object.keys(flow.configs);
var configNodeAttempts = {};
while (configNodes.length > 0) {
id = configNodes.shift();
node = flow.configs[id];
if (!activeNodes[id]) {
var readyToCreate = true;
// This node doesn't exist.
// Check it doesn't reference another non-existent config node
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== '_users' && flow.configs[node[prop]]) {
if (!activeNodes[node[prop]]) {
// References a non-existent config node
// Add it to the back of the list to try again later
configNodes.push(id);
configNodeAttempts[id] = (configNodeAttempts[id]||0)+1;
if (configNodeAttempts[id] === 100) {
throw new Error("Circular config node dependency detected: "+id);
}
readyToCreate = false;
break;
}
}
}
if (readyToCreate) {
newNode = createNode(node.type,node);
if (newNode) {
activeNodes[id] = newNode;
}
}
}
}
if (diff && diff.rewired) {
for (var j=0;j<diff.rewired.length;j++) {
var rewireNode = activeNodes[diff.rewired[j]];
if (rewireNode) {
rewireNode.updateWires(flow.nodes[rewireNode.id].wires);
}
}
}
for (id in flow.nodes) {
if (flow.nodes.hasOwnProperty(id)) {
node = flow.nodes[id];
if (!node.subflow) {
if (!activeNodes[id]) {
newNode = createNode(node.type,node);
if (newNode) {
activeNodes[id] = newNode;
}
}
} else {
if (!subflowInstanceNodes[id]) {
try {
var nodes = createSubflow(flow.subflows[node.subflow]||global.subflows[node.subflow],node,flow.subflows,global.subflows,activeNodes);
subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
for (var i=0;i<nodes.length;i++) {
if (nodes[i]) {
activeNodes[nodes[i].id] = nodes[i];
}
}
} catch(err) {
console.log(err.stack)
}
}
}
}
}
for (id in activeNodes) {
if (activeNodes.hasOwnProperty(id)) {
node = activeNodes[id];
if (node.type === "catch") {
catchNodeMap[node.z] = catchNodeMap[node.z] || [];
catchNodeMap[node.z].push(node);
} else if (node.type === "status") {
statusNodeMap[node.z] = statusNodeMap[node.z] || [];
statusNodeMap[node.z].push(node);
}
}
}
}
this.stop = function(stopList, removedList) {
return when.promise(function(resolve) {
var i;
if (stopList) {
for (i=0;i<stopList.length;i++) {
if (subflowInstanceNodes[stopList[i]]) {
// The first in the list is the instance node we already
// know about
stopList = stopList.concat(subflowInstanceNodes[stopList[i]].slice(1))
}
}
} else {
stopList = Object.keys(activeNodes);
}
// Convert the list to a map to avoid multiple scans of the list
var removedMap = {};
removedList = removedList || [];
removedList.forEach(function(id) {
removedMap[id] = true;
});
var promises = [];
for (i=0;i<stopList.length;i++) {
var node = activeNodes[stopList[i]];
if (node) {
delete activeNodes[stopList[i]];
if (subflowInstanceNodes[stopList[i]]) {
delete subflowInstanceNodes[stopList[i]];
}
try {
var removed = removedMap[stopList[i]];
promises.push(
when.promise(function(resolve, reject) {
var start;
var nt = node.type;
var nid = node.id;
var n = node;
when.promise(function(resolve) {
Log.trace("Stopping node "+nt+":"+nid+(removed?" removed":""));
start = Date.now();
resolve(n.close(removed));
}).timeout(nodeCloseTimeout).then(function(){
var delta = Date.now() - start;
Log.trace("Stopped node "+nt+":"+nid+" ("+delta+"ms)" );
resolve(delta);
},function(err) {
var delta = Date.now() - start;
n.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
reject(err);
});
})
);
} catch(err) {
node.error(err);
}
}
}
when.settle(promises).then(function(results) {
resolve();
});
});
}
this.update = function(_global,_flow) {
global = _global;
flow = _flow;
}
this.getNode = function(id) {
return activeNodes[id];
}
this.getActiveNodes = function() {
return activeNodes;
}
this.handleStatus = function(node,statusMessage) {
var targetStatusNodes = null;
var reportingNode = node;
var handled = false;
while (reportingNode && !handled) {
targetStatusNodes = statusNodeMap[reportingNode.z];
if (targetStatusNodes) {
targetStatusNodes.forEach(function(targetStatusNode) {
if (targetStatusNode.scope && targetStatusNode.scope.indexOf(node.id) === -1) {
return;
}
var message = {
status: {
text: "",
source: {
id: node.id,
type: node.type,
name: node.name
}
}
};
if (statusMessage.hasOwnProperty("text")) {
message.status.text = statusMessage.text.toString();
}
targetStatusNode.receive(message);
handled = true;
});
}
if (!handled) {
reportingNode = activeNodes[reportingNode.z];
}
}
}
this.handleError = function(node,logMessage,msg) {
var count = 1;
if (msg && msg.hasOwnProperty("error") && msg.error !== null) {
if (msg.error.hasOwnProperty("source") && msg.error.source !== null) {
if (msg.error.source.id === node.id) {
count = msg.error.source.count+1;
if (count === 10) {
node.warn(Log._("nodes.flow.error-loop"));
return false;
}
}
}
}
var targetCatchNodes = null;
var throwingNode = node;
var handled = false;
while (throwingNode && !handled) {
targetCatchNodes = catchNodeMap[throwingNode.z];
if (targetCatchNodes) {
targetCatchNodes.forEach(function(targetCatchNode) {
if (targetCatchNode.scope && targetCatchNode.scope.indexOf(throwingNode.id) === -1) {
return;
}
var errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
name: node.name,
count: count
}
};
if (logMessage.hasOwnProperty('stack')) {
errorMessage.error.stack = logMessage.stack;
}
targetCatchNode.receive(errorMessage);
handled = true;
});
}
if (!handled) {
throwingNode = activeNodes[throwingNode.z];
}
}
return handled;
}
}
function createNode(type,config) {
var nn = null;
try {
var nt = typeRegistry.get(type);
if (nt) {
var conf = clone(config);
delete conf.credentials;
for (var p in conf) {
if (conf.hasOwnProperty(p)) {
flowUtil.mapEnvVarProperties(conf,p);
}
}
try {
nn = new nt(conf);
}
catch (err) {
Log.log({
level: Log.ERROR,
id:conf.id,
type: type,
msg: err
});
}
} else {
Log.error(Log._("nodes.flow.unknown-type", {type:type}));
}
} catch(err) {
Log.error(err);
}
return nn;
}
function createSubflow(sf,sfn,subflows,globalSubflows,activeNodes) {
//console.log("CREATE SUBFLOW",sf.id,sfn.id);
var nodes = [];
var node_map = {};
var newNodes = [];
var node;
var wires;
var i,j,k;
var createNodeInSubflow = function(def) {
node = clone(def);
var nid = redUtil.generateId();
node_map[node.id] = node;
node._alias = node.id;
node.id = nid;
node.z = sfn.id;
newNodes.push(node);
}
// Clone all of the subflow node definitions and give them new IDs
for (i in sf.configs) {
if (sf.configs.hasOwnProperty(i)) {
createNodeInSubflow(sf.configs[i]);
}
}
// Clone all of the subflow node definitions and give them new IDs
for (i in sf.nodes) {
if (sf.nodes.hasOwnProperty(i)) {
createNodeInSubflow(sf.nodes[i]);
}
}
// Look for any catch/status nodes and update their scope ids
// Update all subflow interior wiring to reflect new node IDs
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
if (node.wires) {
var outputs = node.wires;
for (j=0;j<outputs.length;j++) {
wires = outputs[j];
for (k=0;k<wires.length;k++) {
outputs[j][k] = node_map[outputs[j][k]].id
}
}
if ((node.type === 'catch' || node.type === 'status') && node.scope) {
node.scope = node.scope.map(function(id) {
return node_map[id]?node_map[id].id:""
})
} else {
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop !== '_alias') {
if (node_map[node[prop]]) {
//console.log("Mapped",node.type,node.id,prop,node_map[node[prop]].id);
node[prop] = node_map[node[prop]].id;
}
}
}
}
}
}
// Create a subflow node to accept inbound messages and route appropriately
var Node = require("../Node");
var subflowInstance = {
id: sfn.id,
type: sfn.type,
z: sfn.z,
name: sfn.name,
wires: []
}
if (sf.in) {
subflowInstance.wires = sf.in.map(function(n) { return n.wires.map(function(w) { return node_map[w.id].id;})})
subflowInstance._originalWires = clone(subflowInstance.wires);
}
var subflowNode = new Node(subflowInstance);
subflowNode.on("input", function(msg) { this.send(msg);});
subflowNode._updateWires = subflowNode.updateWires;
subflowNode.updateWires = function(newWires) {
// Wire the subflow outputs
if (sf.out) {
var node,wires,i,j;
// Restore the original wiring to the internal nodes
subflowInstance.wires = clone(subflowInstance._originalWires);
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id != sf.id) {
node = node_map[wires[j].id];
if (node._originalWires) {
node.wires = clone(node._originalWires);
}
}
}
}
var modifiedNodes = {};
var subflowInstanceModified = false;
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.id) {
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(newWires[i]);
subflowInstanceModified = true;
} else {
node = node_map[wires[j].id];
node.wires[wires[j].port] = node.wires[wires[j].port].concat(newWires[i]);
modifiedNodes[node.id] = node;
}
}
}
Object.keys(modifiedNodes).forEach(function(id) {
var node = modifiedNodes[id];
subflowNode.instanceNodes[id].updateWires(node.wires);
});
if (subflowInstanceModified) {
subflowNode._updateWires(subflowInstance.wires);
}
}
}
nodes.push(subflowNode);
// Wire the subflow outputs
if (sf.out) {
var modifiedNodes = {};
for (i=0;i<sf.out.length;i++) {
wires = sf.out[i].wires;
for (j=0;j<wires.length;j++) {
if (wires[j].id === sf.id) {
// A subflow input wired straight to a subflow output
subflowInstance.wires[wires[j].port] = subflowInstance.wires[wires[j].port].concat(sfn.wires[i])
subflowNode._updateWires(subflowInstance.wires);
} else {
node = node_map[wires[j].id];
modifiedNodes[node.id] = node;
if (!node._originalWires) {
node._originalWires = clone(node.wires);
}
node.wires[wires[j].port] = (node.wires[wires[j].port]||[]).concat(sfn.wires[i]);
}
}
}
}
// Instantiate the nodes
for (i=0;i<newNodes.length;i++) {
node = newNodes[i];
var type = node.type;
var m = /^subflow:(.+)$/.exec(type);
if (!m) {
var newNode = createNode(type,node);
if (newNode) {
activeNodes[node.id] = newNode;
nodes.push(newNode);
}
} else {
var subflowId = m[1];
nodes = nodes.concat(createSubflow(subflows[subflowId]||globalSubflows[subflowId],node,subflows,globalSubflows,activeNodes));
}
}
subflowNode.instanceNodes = {};
nodes.forEach(function(node) {
subflowNode.instanceNodes[node.id] = node;
});
return nodes;
}
module.exports = {
init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
Log = runtime.log;
Node = require("../Node");
},
create: function(global,conf) {
return new Flow(global,conf);
}
}

View File

@@ -0,0 +1,735 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var clone = require("clone");
var when = require("when");
var Flow = require('./Flow');
var typeRegistry = require("@node-red/registry");
var deprecated = typeRegistry.deprecated;
var context = require("../context")
var credentials = require("../credentials");
var flowUtil = require("./util");
var log;
var events = require("../../events");
var redUtil = require("@node-red/util").util;
var storage = null;
var settings = null;
var activeConfig = null;
var activeFlowConfig = null;
var activeFlows = {};
var started = false;
var credentialsPendingReset = false;
var activeNodesToFlow = {};
var subflowInstanceNodeMap = {};
var typeEventRegistered = false;
function init(runtime) {
if (started) {
throw new Error("Cannot init without a stop");
}
settings = runtime.settings;
storage = runtime.storage;
log = runtime.log;
started = false;
if (!typeEventRegistered) {
events.on('type-registered',function(type) {
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
var i = activeFlowConfig.missingTypes.indexOf(type);
if (i != -1) {
log.info(log._("nodes.flows.registered-missing", {type:type}));
activeFlowConfig.missingTypes.splice(i,1);
if (activeFlowConfig.missingTypes.length === 0 && started) {
events.emit("runtime-event",{id:"runtime-state",retain: true});
start();
}
}
}
});
typeEventRegistered = true;
}
Flow.init(runtime);
}
function loadFlows() {
var config;
return storage.getFlows().then(function(_config) {
config = _config;
log.debug("loaded flow revision: "+config.rev);
return credentials.load(config.credentials).then(function() {
events.emit("runtime-event",{id:"runtime-state",retain:true});
return config;
});
}).catch(function(err) {
if (err.code === "credentials_load_failed" && !storage.projects) {
// project disabled, credential load failed
credentialsPendingReset = true;
log.warn(log._("nodes.flows.error",{message:err.toString()}));
events.emit("runtime-event",{id:"runtime-state",payload:{type:"warning",error:err.code,text:"notification.warnings.credentials_load_failed_reset"},retain:true});
return config;
} else {
activeConfig = null;
events.emit("runtime-event",{id:"runtime-state",payload:{type:"warning",error:err.code,project:err.project,text:"notification.warnings."+err.code},retain:true});
if (err.code === "project_not_found") {
log.warn(log._("storage.localfilesystem.projects.project-not-found",{project:err.project}));
} else {
log.warn(log._("nodes.flows.error",{message:err.toString()}));
}
throw err;
}
});
}
function load(forceStart) {
return setFlows(null,"load",false,forceStart);
}
/*
* _config - new node array configuration
* type - full/nodes/flows/load (default full)
* muteLog - don't emit the standard log messages (used for individual flow api)
*/
function setFlows(_config,type,muteLog,forceStart) {
type = type||"full";
var configSavePromise = null;
var config = null;
var diff;
var newFlowConfig;
var isLoad = false;
if (type === "load") {
isLoad = true;
configSavePromise = loadFlows().then(function(_config) {
config = clone(_config.flows);
newFlowConfig = flowUtil.parseConfig(clone(config));
type = "full";
return _config.rev;
});
} else {
config = clone(_config);
newFlowConfig = flowUtil.parseConfig(clone(config));
diff = flowUtil.diffConfigs(activeFlowConfig,newFlowConfig);
// Now the flows have been compared, remove any credentials from newFlowConfig
// so they don't cause false-positive diffs the next time a flow is deployed
for (var id in newFlowConfig.allNodes) {
if (newFlowConfig.allNodes.hasOwnProperty(id)) {
delete newFlowConfig.allNodes[id].credentials;
}
}
credentials.clean(config);
var credsDirty = credentials.dirty();
configSavePromise = credentials.export().then(function(creds) {
var saveConfig = {
flows: config,
credentialsDirty:credsDirty,
credentials: creds
}
return storage.saveFlows(saveConfig);
});
}
return configSavePromise
.then(function(flowRevision) {
if (!isLoad) {
log.debug("saved flow revision: "+flowRevision);
}
activeConfig = {
flows:config,
rev:flowRevision
};
activeFlowConfig = newFlowConfig;
if (forceStart || started) {
return stop(type,diff,muteLog).then(function() {
return context.clean(activeFlowConfig).then(function() {
start(type,diff,muteLog).then(function() {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
});
return flowRevision;
});
}).catch(function(err) {
})
} else {
events.emit("runtime-event",{id:"runtime-deploy",payload:{revision:flowRevision},retain: true});
}
});
}
function getNode(id) {
var node;
if (activeNodesToFlow[id] && activeFlows[activeNodesToFlow[id]]) {
return activeFlows[activeNodesToFlow[id]].getNode(id);
}
for (var flowId in activeFlows) {
if (activeFlows.hasOwnProperty(flowId)) {
node = activeFlows[flowId].getNode(id);
if (node) {
return node;
}
}
}
return null;
}
function eachNode(cb) {
for (var id in activeFlowConfig.allNodes) {
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
cb(activeFlowConfig.allNodes[id]);
}
}
}
function getFlows() {
return activeConfig;
}
function delegateError(node,logMessage,msg) {
var handled = false;
if (activeFlows[node.z]) {
handled = activeFlows[node.z].handleError(node,logMessage,msg);
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
handled = activeFlows[activeNodesToFlow[node.z]].handleError(node,logMessage,msg);
} else if (activeFlowConfig.subflows[node.z] && subflowInstanceNodeMap[node.id]) {
subflowInstanceNodeMap[node.id].forEach(function(n) {
handled = handled || delegateError(getNode(n),logMessage,msg);
});
}
return handled;
}
function handleError(node,logMessage,msg) {
var handled = false;
if (node.z) {
handled = delegateError(node,logMessage,msg);
} else {
if (activeFlowConfig.configs[node.id]) {
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
var userNode = activeFlowConfig.allNodes[id];
handled = handled || delegateError(userNode,logMessage,msg);
})
}
}
return handled;
}
function delegateStatus(node,statusMessage) {
if (activeFlows[node.z]) {
activeFlows[node.z].handleStatus(node,statusMessage);
} else if (activeNodesToFlow[node.z] && activeFlows[activeNodesToFlow[node.z]]) {
activeFlows[activeNodesToFlow[node.z]].handleStatus(node,statusMessage);
}
}
function handleStatus(node,statusMessage) {
events.emit("node-status",{
id: node.id,
status:statusMessage
});
if (node.z) {
delegateStatus(node,statusMessage);
} else {
if (activeFlowConfig.configs[node.id]) {
activeFlowConfig.configs[node.id]._users.forEach(function(id) {
var userNode = activeFlowConfig.allNodes[id];
delegateStatus(userNode,statusMessage);
})
}
}
}
function start(type,diff,muteLog) {
//dumpActiveNodes();
type = type||"full";
started = true;
var i;
if (activeFlowConfig.missingTypes.length > 0) {
log.info(log._("nodes.flows.missing-types"));
var knownUnknowns = 0;
for (i=0;i<activeFlowConfig.missingTypes.length;i++) {
var nodeType = activeFlowConfig.missingTypes[i];
var info = deprecated.get(nodeType);
if (info) {
log.info(log._("nodes.flows.missing-type-provided",{type:activeFlowConfig.missingTypes[i],module:info.module}));
knownUnknowns += 1;
} else {
log.info(" - "+activeFlowConfig.missingTypes[i]);
}
}
if (knownUnknowns > 0) {
log.info(log._("nodes.flows.missing-type-install-1"));
log.info(" npm install <module name>");
log.info(log._("nodes.flows.missing-type-install-2"));
log.info(" "+settings.userDir);
}
events.emit("runtime-event",{id:"runtime-state",payload:{error:"missing-types", type:"warning",text:"notification.warnings.missing-types",types:activeFlowConfig.missingTypes},retain:true});
return when.resolve();
}
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.starting-modified-"+type));
} else {
log.info(log._("nodes.flows.starting-flows"));
}
}
var id;
if (type === "full") {
if (!activeFlows['global']) {
log.debug("red/nodes/flows.start : starting flow : global");
activeFlows['global'] = Flow.create(activeFlowConfig);
}
for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (!activeFlowConfig.flows[id].disabled && !activeFlows[id]) {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id);
} else {
log.debug("red/nodes/flows.start : not starting disabled flow : "+id);
}
}
}
} else {
activeFlows['global'].update(activeFlowConfig,activeFlowConfig);
for (id in activeFlowConfig.flows) {
if (activeFlowConfig.flows.hasOwnProperty(id)) {
if (!activeFlowConfig.flows[id].disabled) {
if (activeFlows[id]) {
activeFlows[id].update(activeFlowConfig,activeFlowConfig.flows[id]);
} else {
activeFlows[id] = Flow.create(activeFlowConfig,activeFlowConfig.flows[id]);
log.debug("red/nodes/flows.start : starting flow : "+id);
}
} else {
log.debug("red/nodes/flows.start : not starting disabled flow : "+id);
}
}
}
}
for (id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) {
activeFlows[id].start(diff);
var activeNodes = activeFlows[id].getActiveNodes();
Object.keys(activeNodes).forEach(function(nid) {
activeNodesToFlow[nid] = id;
if (activeNodes[nid]._alias) {
subflowInstanceNodeMap[activeNodes[nid]._alias] = subflowInstanceNodeMap[activeNodes[nid]._alias] || [];
subflowInstanceNodeMap[activeNodes[nid]._alias].push(nid);
}
});
}
}
events.emit("nodes-started");
if (credentialsPendingReset === true) {
credentialsPendingReset = false;
} else {
events.emit("runtime-event",{id:"runtime-state",retain:true});
}
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.started-modified-"+type));
} else {
log.info(log._("nodes.flows.started-flows"));
}
}
return when.resolve();
}
function stop(type,diff,muteLog) {
if (!started) {
return when.resolve();
}
type = type||"full";
diff = diff||{
added:[],
changed:[],
removed:[],
rewired:[],
linked:[]
};
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.stopping-modified-"+type));
} else {
log.info(log._("nodes.flows.stopping-flows"));
}
}
started = false;
var promises = [];
var stopList;
var removedList = diff.removed;
if (type === 'nodes') {
stopList = diff.changed.concat(diff.removed);
} else if (type === 'flows') {
stopList = diff.changed.concat(diff.removed).concat(diff.linked);
}
for (var id in activeFlows) {
if (activeFlows.hasOwnProperty(id)) {
var flowStateChanged = diff && (diff.added.indexOf(id) !== -1 || diff.removed.indexOf(id) !== -1);
log.debug("red/nodes/flows.stop : stopping flow : "+id);
promises = promises.concat(activeFlows[id].stop(flowStateChanged?null:stopList,removedList));
if (type === "full" || flowStateChanged || diff.removed.indexOf(id)!==-1) {
delete activeFlows[id];
}
}
}
return when.promise(function(resolve,reject) {
when.settle(promises).then(function() {
for (id in activeNodesToFlow) {
if (activeNodesToFlow.hasOwnProperty(id)) {
if (!activeFlows[activeNodesToFlow[id]]) {
delete activeNodesToFlow[id];
}
}
}
if (stopList) {
stopList.forEach(function(id) {
delete activeNodesToFlow[id];
});
}
// Ideally we'd prune just what got stopped - but mapping stopList
// id to the list of subflow instance nodes is something only Flow
// can do... so cheat by wiping the map knowing it'll be rebuilt
// in start()
subflowInstanceNodeMap = {};
if (!muteLog) {
if (type !== "full") {
log.info(log._("nodes.flows.stopped-modified-"+type));
} else {
log.info(log._("nodes.flows.stopped-flows"));
}
}
resolve();
});
});
}
function checkTypeInUse(id) {
var nodeInfo = typeRegistry.getNodeInfo(id);
if (!nodeInfo) {
throw new Error(log._("nodes.index.unrecognised-id", {id:id}));
} else {
var inUse = {};
var config = getFlows();
config.flows.forEach(function(n) {
inUse[n.type] = (inUse[n.type]||0)+1;
});
var nodesInUse = [];
nodeInfo.types.forEach(function(t) {
if (inUse[t]) {
nodesInUse.push(t);
}
});
if (nodesInUse.length > 0) {
var msg = nodesInUse.join(", ");
var err = new Error(log._("nodes.index.type-in-use", {msg:msg}));
err.code = "type_in_use";
throw err;
}
}
}
function updateMissingTypes() {
var subflowInstanceRE = /^subflow:(.+)$/;
activeFlowConfig.missingTypes = [];
for (var id in activeFlowConfig.allNodes) {
if (activeFlowConfig.allNodes.hasOwnProperty(id)) {
var node = activeFlowConfig.allNodes[id];
if (node.type !== 'tab' && node.type !== 'subflow') {
var subflowDetails = subflowInstanceRE.exec(node.type);
if ( (subflowDetails && !activeFlowConfig.subflows[subflowDetails[1]]) || (!subflowDetails && !typeRegistry.get(node.type)) ) {
if (activeFlowConfig.missingTypes.indexOf(node.type) === -1) {
activeFlowConfig.missingTypes.push(node.type);
}
}
}
}
}
}
function addFlow(flow) {
var i,node;
if (!flow.hasOwnProperty('nodes')) {
throw new Error('missing nodes property');
}
flow.id = redUtil.generateId();
var tabNode = {
type:'tab',
label:flow.label,
id:flow.id
}
if (flow.hasOwnProperty('info')) {
tabNode.info = flow.info;
}
if (flow.hasOwnProperty('disabled')) {
tabNode.disabled = flow.disabled;
}
var nodes = [tabNode];
for (i=0;i<flow.nodes.length;i++) {
node = flow.nodes[i];
if (activeFlowConfig.allNodes[node.id]) {
// TODO nls
return when.reject(new Error('duplicate id'));
}
if (node.type === 'tab' || node.type === 'subflow') {
return when.reject(new Error('invalid node type: '+node.type));
}
node.z = flow.id;
nodes.push(node);
}
if (flow.configs) {
for (i=0;i<flow.configs.length;i++) {
node = flow.configs[i];
if (activeFlowConfig.allNodes[node.id]) {
// TODO nls
return when.reject(new Error('duplicate id'));
}
if (node.type === 'tab' || node.type === 'subflow') {
return when.reject(new Error('invalid node type: '+node.type));
}
node.z = flow.id;
nodes.push(node);
}
}
var newConfig = clone(activeConfig.flows);
newConfig = newConfig.concat(nodes);
return setFlows(newConfig,'flows',true).then(function() {
log.info(log._("nodes.flows.added-flow",{label:(flow.label?flow.label+" ":"")+"["+flow.id+"]"}));
return flow.id;
});
}
function getFlow(id) {
var flow;
if (id === 'global') {
flow = activeFlowConfig;
} else {
flow = activeFlowConfig.flows[id];
}
if (!flow) {
return null;
}
var result = {
id: id
};
if (flow.label) {
result.label = flow.label;
}
if (flow.disabled) {
result.disabled = flow.disabled;
}
if (flow.hasOwnProperty('info')) {
result.info = flow.info;
}
if (id !== 'global') {
result.nodes = [];
}
if (flow.nodes) {
var nodeIds = Object.keys(flow.nodes);
if (nodeIds.length > 0) {
result.nodes = nodeIds.map(function(nodeId) {
var node = clone(flow.nodes[nodeId]);
if (node.type === 'link out') {
delete node.wires;
}
return node;
})
}
}
if (flow.configs) {
var configIds = Object.keys(flow.configs);
result.configs = configIds.map(function(configId) {
return clone(flow.configs[configId]);
})
if (result.configs.length === 0) {
delete result.configs;
}
}
if (flow.subflows) {
var subflowIds = Object.keys(flow.subflows);
result.subflows = subflowIds.map(function(subflowId) {
var subflow = clone(flow.subflows[subflowId]);
var nodeIds = Object.keys(subflow.nodes);
subflow.nodes = nodeIds.map(function(id) {
return subflow.nodes[id];
});
if (subflow.configs) {
var configIds = Object.keys(subflow.configs);
subflow.configs = configIds.map(function(id) {
return subflow.configs[id];
})
}
delete subflow.instances;
return subflow;
});
if (result.subflows.length === 0) {
delete result.subflows;
}
}
return result;
}
function updateFlow(id,newFlow) {
var label = id;
if (id !== 'global') {
if (!activeFlowConfig.flows[id]) {
var e = new Error();
e.code = 404;
throw e;
}
label = activeFlowConfig.flows[id].label;
}
var newConfig = clone(activeConfig.flows);
var nodes;
if (id === 'global') {
// Remove all nodes whose z is not a known flow
// When subflows can be owned by a flow, this logic will have to take
// that into account
newConfig = newConfig.filter(function(node) {
return node.type === 'tab' || (node.hasOwnProperty('z') && activeFlowConfig.flows.hasOwnProperty(node.z));
})
// Add in the new config nodes
nodes = newFlow.configs||[];
if (newFlow.subflows) {
// Add in the new subflows
newFlow.subflows.forEach(function(sf) {
nodes = nodes.concat(sf.nodes||[]).concat(sf.configs||[]);
delete sf.nodes;
delete sf.configs;
nodes.push(sf);
});
}
} else {
newConfig = newConfig.filter(function(node) {
return node.z !== id && node.id !== id;
});
var tabNode = {
type:'tab',
label:newFlow.label,
id:id
}
if (newFlow.hasOwnProperty('info')) {
tabNode.info = newFlow.info;
}
if (newFlow.hasOwnProperty('disabled')) {
tabNode.disabled = newFlow.disabled;
}
nodes = [tabNode].concat(newFlow.nodes||[]).concat(newFlow.configs||[]);
nodes.forEach(function(n) {
n.z = id;
});
}
newConfig = newConfig.concat(nodes);
return setFlows(newConfig,'flows',true).then(function() {
log.info(log._("nodes.flows.updated-flow",{label:(label?label+" ":"")+"["+id+"]"}));
})
}
function removeFlow(id) {
if (id === 'global') {
// TODO: nls + error code
throw new Error('not allowed to remove global');
}
var flow = activeFlowConfig.flows[id];
if (!flow) {
var e = new Error();
e.code = 404;
throw e;
}
var newConfig = clone(activeConfig.flows);
newConfig = newConfig.filter(function(node) {
return node.z !== id && node.id !== id;
});
return setFlows(newConfig,'flows',true).then(function() {
log.info(log._("nodes.flows.removed-flow",{label:(flow.label?flow.label+" ":"")+"["+flow.id+"]"}));
});
}
module.exports = {
init: init,
/**
* Load the current flow configuration from storage
* @return a promise for the loading of the config
*/
load: load,
get:getNode,
eachNode: eachNode,
/**
* Gets the current flow configuration
*/
getFlows: getFlows,
/**
* Sets the current active config.
* @param config the configuration to enable
* @param type the type of deployment to do: full (default), nodes, flows, load
* @return a promise for the saving/starting of the new flow
*/
setFlows: setFlows,
/**
* Starts the current flow configuration
*/
startFlows: start,
/**
* Stops the current flow configuration
* @return a promise for the stopping of the flow
*/
stopFlows: stop,
get started() { return started },
handleError: handleError,
handleStatus: handleStatus,
checkTypeInUse: checkTypeInUse,
addFlow: addFlow,
getFlow: getFlow,
updateFlow: updateFlow,
removeFlow: removeFlow,
disableFlow:null,
enableFlow:null
};

View File

@@ -0,0 +1,447 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var clone = require("clone");
var redUtil = require("@node-red/util").util;
var subflowInstanceRE = /^subflow:(.+)$/;
var typeRegistry = require("@node-red/registry");
function diffNodes(oldNode,newNode) {
if (oldNode == null) {
return true;
}
var oldKeys = Object.keys(oldNode).filter(function(p) { return p != "x" && p != "y" && p != "wires" });
var newKeys = Object.keys(newNode).filter(function(p) { return p != "x" && p != "y" && p != "wires" });
if (oldKeys.length != newKeys.length) {
return true;
}
for (var i=0;i<newKeys.length;i++) {
var p = newKeys[i];
if (!redUtil.compareObjects(oldNode[p],newNode[p])) {
return true;
}
}
return false;
}
var EnvVarPropertyRE_old = /^\$\((\S+)\)$/;
var EnvVarPropertyRE = /^\${(\S+)}$/;
function mapEnvVarProperties(obj,prop) {
if (Buffer.isBuffer(obj[prop])) {
return;
} else if (Array.isArray(obj[prop])) {
for (var i=0;i<obj[prop].length;i++) {
mapEnvVarProperties(obj[prop],i);
}
} else if (typeof obj[prop] === 'string') {
if (obj[prop][0] === "$" && (EnvVarPropertyRE_old.test(obj[prop]) || EnvVarPropertyRE.test(obj[prop])) ) {
var envVar = obj[prop].substring(2,obj[prop].length-1);
obj[prop] = process.env.hasOwnProperty(envVar)?process.env[envVar]:obj[prop];
}
} else {
for (var p in obj[prop]) {
if (obj[prop].hasOwnProperty(p)) {
mapEnvVarProperties(obj[prop],p);
}
}
}
}
module.exports = {
diffNodes: diffNodes,
mapEnvVarProperties: mapEnvVarProperties,
parseConfig: function(config) {
var flow = {};
flow.allNodes = {};
flow.subflows = {};
flow.configs = {};
flow.flows = {};
flow.missingTypes = [];
config.forEach(function(n) {
flow.allNodes[n.id] = clone(n);
if (n.type === 'tab') {
flow.flows[n.id] = n;
flow.flows[n.id].subflows = {};
flow.flows[n.id].configs = {};
flow.flows[n.id].nodes = {};
}
});
config.forEach(function(n) {
if (n.type === 'subflow') {
flow.subflows[n.id] = n;
flow.subflows[n.id].configs = {};
flow.subflows[n.id].nodes = {};
flow.subflows[n.id].instances = [];
}
});
var linkWires = {};
var linkOutNodes = [];
config.forEach(function(n) {
if (n.type !== 'subflow' && n.type !== 'tab') {
var subflowDetails = subflowInstanceRE.exec(n.type);
if ( (subflowDetails && !flow.subflows[subflowDetails[1]]) || (!subflowDetails && !typeRegistry.get(n.type)) ) {
if (flow.missingTypes.indexOf(n.type) === -1) {
flow.missingTypes.push(n.type);
}
}
var container = null;
if (flow.flows[n.z]) {
container = flow.flows[n.z];
} else if (flow.subflows[n.z]) {
container = flow.subflows[n.z];
}
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
if (subflowDetails) {
var subflowType = subflowDetails[1]
n.subflow = subflowType;
flow.subflows[subflowType].instances.push(n)
}
if (container) {
container.nodes[n.id] = n;
}
} else {
if (container) {
container.configs[n.id] = n;
} else {
flow.configs[n.id] = n;
flow.configs[n.id]._users = [];
}
}
if (n.type === 'link in' && n.links) {
// Ensure wires are present in corresponding link out nodes
n.links.forEach(function(id) {
linkWires[id] = linkWires[id]||{};
linkWires[id][n.id] = true;
})
} else if (n.type === 'link out' && n.links) {
linkWires[n.id] = linkWires[n.id]||{};
n.links.forEach(function(id) {
linkWires[n.id][id] = true;
})
linkOutNodes.push(n);
}
}
});
linkOutNodes.forEach(function(n) {
var links = linkWires[n.id];
var targets = Object.keys(links);
n.wires = [targets];
});
var addedTabs = {};
config.forEach(function(n) {
if (n.type !== 'subflow' && n.type !== 'tab') {
for (var prop in n) {
if (n.hasOwnProperty(prop) && prop !== 'id' && prop !== 'wires' && prop !== 'type' && prop !== '_users' && flow.configs.hasOwnProperty(n[prop])) {
// This property references a global config node
flow.configs[n[prop]]._users.push(n.id)
}
}
if (n.z && !flow.subflows[n.z]) {
if (!flow.flows[n.z]) {
flow.flows[n.z] = {type:'tab',id:n.z};
flow.flows[n.z].subflows = {};
flow.flows[n.z].configs = {};
flow.flows[n.z].nodes = {};
addedTabs[n.z] = flow.flows[n.z];
}
if (addedTabs[n.z]) {
if (n.hasOwnProperty('x') && n.hasOwnProperty('y')) {
addedTabs[n.z].nodes[n.id] = n;
} else {
addedTabs[n.z].configs[n.id] = n;
}
}
}
}
});
return flow;
},
diffConfigs: function(oldConfig, newConfig) {
var id;
var node;
var nn;
var wires;
var j,k;
if (!oldConfig) {
oldConfig = {
flows:{},
allNodes:{}
}
}
var changedSubflows = {};
var added = {};
var removed = {};
var changed = {};
var wiringChanged = {};
var linkMap = {};
var changedTabs = {};
// Look for tabs that have been removed
for (id in oldConfig.flows) {
if (oldConfig.flows.hasOwnProperty(id) && (!newConfig.flows.hasOwnProperty(id))) {
removed[id] = oldConfig.allNodes[id];
}
}
// Look for tabs that have been disabled
for (id in oldConfig.flows) {
if (oldConfig.flows.hasOwnProperty(id) && newConfig.flows.hasOwnProperty(id)) {
var originalState = oldConfig.flows[id].disabled||false;
var newState = newConfig.flows[id].disabled||false;
if (originalState !== newState) {
changedTabs[id] = true;
if (originalState) {
added[id] = oldConfig.allNodes[id];
} else {
removed[id] = oldConfig.allNodes[id];
}
}
}
}
for (id in oldConfig.allNodes) {
if (oldConfig.allNodes.hasOwnProperty(id)) {
node = oldConfig.allNodes[id];
if (node.type !== 'tab') {
// build the map of what this node was previously wired to
if (node.wires) {
linkMap[node.id] = linkMap[node.id] || [];
for (j=0;j<node.wires.length;j++) {
wires = node.wires[j];
for (k=0;k<wires.length;k++) {
linkMap[node.id].push(wires[k]);
nn = oldConfig.allNodes[wires[k]];
if (nn) {
linkMap[nn.id] = linkMap[nn.id] || [];
linkMap[nn.id].push(node.id);
}
}
}
}
// This node has been removed
if (removed[node.z] || !newConfig.allNodes.hasOwnProperty(id)) {
removed[id] = node;
// Mark the container as changed
if (!removed[node.z] && newConfig.allNodes[removed[id].z]) {
changed[removed[id].z] = newConfig.allNodes[removed[id].z];
if (changed[removed[id].z].type === "subflow") {
changedSubflows[removed[id].z] = changed[removed[id].z];
//delete removed[id];
}
}
} else {
if (added[node.z]) {
added[id] = node;
} else {
// This node has a material configuration change
if (diffNodes(node,newConfig.allNodes[id]) || newConfig.allNodes[id].credentials) {
changed[id] = newConfig.allNodes[id];
if (changed[id].type === "subflow") {
changedSubflows[id] = changed[id];
}
// Mark the container as changed
if (newConfig.allNodes[changed[id].z]) {
changed[changed[id].z] = newConfig.allNodes[changed[id].z];
if (changed[changed[id].z].type === "subflow") {
changedSubflows[changed[id].z] = changed[changed[id].z];
delete changed[id];
}
}
}
// This node's wiring has changed
if (!redUtil.compareObjects(node.wires,newConfig.allNodes[id].wires)) {
wiringChanged[id] = newConfig.allNodes[id];
// Mark the container as changed
if (newConfig.allNodes[wiringChanged[id].z]) {
changed[wiringChanged[id].z] = newConfig.allNodes[wiringChanged[id].z];
if (changed[wiringChanged[id].z].type === "subflow") {
changedSubflows[wiringChanged[id].z] = changed[wiringChanged[id].z];
delete wiringChanged[id];
}
}
}
}
}
}
}
}
// Look for added nodes
for (id in newConfig.allNodes) {
if (newConfig.allNodes.hasOwnProperty(id)) {
node = newConfig.allNodes[id];
// build the map of what this node is now wired to
if (node.wires) {
linkMap[node.id] = linkMap[node.id] || [];
for (j=0;j<node.wires.length;j++) {
wires = node.wires[j];
for (k=0;k<wires.length;k++) {
if (linkMap[node.id].indexOf(wires[k]) === -1) {
linkMap[node.id].push(wires[k]);
}
nn = newConfig.allNodes[wires[k]];
if (nn) {
linkMap[nn.id] = linkMap[nn.id] || [];
if (linkMap[nn.id].indexOf(node.id) === -1) {
linkMap[nn.id].push(node.id);
}
}
}
}
}
// This node has been added
if (!oldConfig.allNodes.hasOwnProperty(id)) {
added[id] = node;
// Mark the container as changed
if (newConfig.allNodes[added[id].z]) {
changed[added[id].z] = newConfig.allNodes[added[id].z];
if (changed[added[id].z].type === "subflow") {
changedSubflows[added[id].z] = changed[added[id].z];
delete added[id];
}
}
}
}
}
var madeChange;
// Loop through the nodes looking for references to changed config nodes
// Repeat the loop if anything is marked as changed as it may need to be
// propagated to parent nodes.
// TODO: looping through all nodes every time is a bit inefficient - could be more targeted
do {
madeChange = false;
for (id in newConfig.allNodes) {
if (newConfig.allNodes.hasOwnProperty(id)) {
node = newConfig.allNodes[id];
for (var prop in node) {
if (node.hasOwnProperty(prop) && prop != "z" && prop != "id" && prop != "wires") {
// This node has a property that references a changed/removed node
// Assume it is a config node change and mark this node as
// changed.
if (changed[node[prop]] || removed[node[prop]]) {
if (!changed[node.id]) {
madeChange = true;
changed[node.id] = node;
// This node exists within subflow template
// Mark the template as having changed
if (newConfig.allNodes[node.z]) {
changed[node.z] = newConfig.allNodes[node.z];
if (changed[node.z].type === "subflow") {
changedSubflows[node.z] = changed[node.z];
}
}
}
}
}
}
}
}
} while (madeChange===true)
// Find any nodes that exist on a subflow template and remove from changed
// list as the parent subflow will now be marked as containing a change
for (id in newConfig.allNodes) {
if (newConfig.allNodes.hasOwnProperty(id)) {
node = newConfig.allNodes[id];
if (newConfig.allNodes[node.z] && newConfig.allNodes[node.z].type === "subflow") {
delete changed[node.id];
}
}
}
// Recursively mark all instances of changed subflows as changed
var changedSubflowStack = Object.keys(changedSubflows);
while (changedSubflowStack.length > 0) {
var subflowId = changedSubflowStack.pop();
for (id in newConfig.allNodes) {
if (newConfig.allNodes.hasOwnProperty(id)) {
node = newConfig.allNodes[id];
if (node.type === 'subflow:'+subflowId) {
if (!changed[node.id]) {
changed[node.id] = node;
if (!changed[changed[node.id].z] && newConfig.allNodes[changed[node.id].z]) {
changed[changed[node.id].z] = newConfig.allNodes[changed[node.id].z];
if (newConfig.allNodes[changed[node.id].z].type === "subflow") {
// This subflow instance is inside a subflow. Add the
// containing subflow to the stack to mark
changedSubflowStack.push(changed[node.id].z);
delete changed[node.id];
}
}
}
}
}
}
}
var diff = {
added:Object.keys(added),
changed:Object.keys(changed),
removed:Object.keys(removed),
rewired:Object.keys(wiringChanged),
linked:[]
}
// Traverse the links of all modified nodes to mark the connected nodes
var modifiedNodes = diff.added.concat(diff.changed).concat(diff.removed).concat(diff.rewired);
var visited = {};
while (modifiedNodes.length > 0) {
node = modifiedNodes.pop();
if (!visited[node]) {
visited[node] = true;
if (linkMap[node]) {
if (!changed[node] && !added[node] && !removed[node] && !wiringChanged[node]) {
diff.linked.push(node);
}
modifiedNodes = modifiedNodes.concat(linkMap[node]);
}
}
}
// console.log(diff);
// for (id in newConfig.allNodes) {
// console.log(
// (added[id]?"+":(changed[id]?"!":" "))+(wiringChanged[id]?"w":" ")+(diff.linked.indexOf(id)!==-1?"~":" "),
// id,
// newConfig.allNodes[id].type,
// newConfig.allNodes[id].name||newConfig.allNodes[id].label||""
// );
// }
// for (id in removed) {
// console.log(
// "- "+(diff.linked.indexOf(id)!==-1?"~":" "),
// id,
// oldConfig.allNodes[id].type,
// oldConfig.allNodes[id].name||oldConfig.allNodes[id].label||""
// );
// }
return diff;
}
}

View File

@@ -0,0 +1,247 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var when = require("when");
var path = require("path");
var fs = require("fs");
var clone = require("clone");
var util = require("util");
var registry = require("@node-red/registry");
var credentials = require("./credentials");
var flows = require("./flows");
var flowUtil = require("./flows/util")
var context = require("./context");
var Node = require("./Node");
var log;
var events = require("../events");
var settings;
/**
* Registers a node constructor
* @param nodeSet - the nodeSet providing the node (module/set)
* @param type - the string type name
* @param constructor - the constructor function for this node type
* @param opts - optional additional options for the node
*/
function registerType(nodeSet,type,constructor,opts) {
if (typeof type !== "string") {
// This is someone calling the api directly, rather than via the
// RED object provided to a node. Log a warning
log.warn("["+nodeSet+"] Deprecated call to RED.runtime.nodes.registerType - node-set name must be provided as first argument");
opts = constructor;
constructor = type;
type = nodeSet;
nodeSet = "";
}
if (opts) {
if (opts.credentials) {
credentials.register(type,opts.credentials);
}
if (opts.settings) {
try {
settings.registerNodeSettings(type,opts.settings);
} catch(err) {
log.warn("["+type+"] "+err.message);
}
}
}
if(!(constructor.prototype instanceof Node)) {
if(Object.getPrototypeOf(constructor.prototype) === Object.prototype) {
util.inherits(constructor,Node);
} else {
var proto = constructor.prototype;
while(Object.getPrototypeOf(proto) !== Object.prototype) {
proto = Object.getPrototypeOf(proto);
}
//TODO: This is a partial implementation of util.inherits >= node v5.0.0
// which should be changed when support for node < v5.0.0 is dropped
// see: https://github.com/nodejs/node/pull/3455
proto.constructor.super_ = Node;
if(Object.setPrototypeOf) {
Object.setPrototypeOf(proto, Node.prototype);
} else {
// hack for node v0.10
proto.__proto__ = Node.prototype;
}
}
}
registry.registerType(nodeSet,type,constructor);
}
/**
* Called from a Node's constructor function, invokes the super-class
* constructor and attaches any credentials to the node.
* @param node the node object being created
* @param def the instance definition for the node
*/
function createNode(node,def) {
Node.call(node,def);
var id = node.id;
if (def._alias) {
id = def._alias;
}
var creds = credentials.get(id);
if (creds) {
creds = clone(creds);
//console.log("Attaching credentials to ",node.id);
// allow $(foo) syntax to substitute env variables for credentials also...
for (var p in creds) {
if (creds.hasOwnProperty(p)) {
flowUtil.mapEnvVarProperties(creds,p);
}
}
node.credentials = creds;
} else if (credentials.getDefinition(node.type)) {
node.credentials = {};
}
}
function init(runtime) {
settings = runtime.settings;
log = runtime.log;
credentials.init(runtime);
flows.init(runtime);
registry.init(runtime);
context.init(runtime.settings);
}
function disableNode(id) {
flows.checkTypeInUse(id);
return registry.disableNode(id).then(function(info) {
reportNodeStateChange(info,false);
return info;
});
}
function enableNode(id) {
return registry.enableNode(id).then(function(info) {
reportNodeStateChange(info,true);
return info;
});
}
function reportNodeStateChange(info,enabled) {
if (info.enabled === enabled && !info.err) {
events.emit("runtime-event",{id:"node/"+(enabled?"enabled":"disabled"),retain:false,payload:info});
log.info(" "+log._("api.nodes."+(enabled?"enabled":"disabled")));
for (var i=0;i<info.types.length;i++) {
log.info(" - "+info.types[i]);
}
} else if (enabled && info.err) {
log.warn(log._("api.nodes.error-enable"));
log.warn(" - "+info.name+" : "+info.err);
}
}
function installModule(module,version) {
var ex_module = registry.getModuleInfo(module);
var isUpgrade = !!ex_module;
return registry.installModule(module,version).then(function(info) {
if (isUpgrade) {
events.emit("runtime-event",{id:"node/upgraded",retain:false,payload:{module:module,version:version}});
} else {
events.emit("runtime-event",{id:"node/added",retain:false,payload:info.nodes});
}
return info;
});
}
function uninstallModule(module) {
var info = registry.getModuleInfo(module);
if (!info) {
throw new Error(log._("nodes.index.unrecognised-module", {module:module}));
} else {
for (var i=0;i<info.nodes.length;i++) {
flows.checkTypeInUse(module+"/"+info.nodes[i].name);
}
return registry.uninstallModule(module).then(function(list) {
events.emit("runtime-event",{id:"node/removed",retain:false,payload:list});
return list;
});
}
}
module.exports = {
// Lifecycle
init: init,
load: registry.load,
// Node registry
createNode: createNode,
getNode: flows.get,
eachNode: flows.eachNode,
getContext: context.get,
paletteEditorEnabled: registry.paletteEditorEnabled,
installModule: installModule,
uninstallModule: uninstallModule,
enableNode: enableNode,
disableNode: disableNode,
// Node type registry
registerType: registerType,
getType: registry.get,
getNodeInfo: registry.getNodeInfo,
getNodeList: registry.getNodeList,
getModuleInfo: registry.getModuleInfo,
getNodeConfigs: registry.getNodeConfigs,
getNodeConfig: registry.getNodeConfig,
getNodeIconPath: registry.getNodeIconPath,
getNodeIcons: registry.getNodeIcons,
getNodeExampleFlows: registry.getNodeExampleFlows,
getNodeExampleFlowPath: registry.getNodeExampleFlowPath,
clearRegistry: registry.clear,
cleanModuleList: registry.cleanModuleList,
// Flow handling
loadFlows: flows.load,
startFlows: flows.startFlows,
stopFlows: flows.stopFlows,
setFlows: flows.setFlows,
getFlows: flows.getFlows,
addFlow: flows.addFlow,
getFlow: flows.getFlow,
updateFlow: flows.updateFlow,
removeFlow: flows.removeFlow,
// disableFlow: flows.disableFlow,
// enableFlow: flows.enableFlow,
// Credentials
addCredentials: credentials.add,
getCredentials: credentials.get,
deleteCredentials: credentials.delete,
getCredentialDefinition: credentials.getDefinition,
setCredentialSecret: credentials.setKey,
clearCredentials: credentials.clear,
exportCredentials: credentials.export,
getCredentialKeyType: credentials.getKeyType,
// Contexts
loadContextsPlugin: context.load,
closeContextsPlugin: context.close,
listContextStores: context.listStores
};