Add node done API

This commit is contained in:
Nick O'Leary
2019-07-08 23:23:33 +01:00
parent f0a51bafbe
commit 3b5ea0f15f
17 changed files with 1039 additions and 498 deletions

View File

@@ -27,6 +27,8 @@ function Node(n) {
this.type = n.type;
this.z = n.z;
this._closeCallbacks = [];
this._inputCallback = null;
this._inputCallbacks = null;
if (n.name) {
this.name = n.name;
@@ -43,6 +45,10 @@ function Node(n) {
// as part of its constructure - config._flow will overwrite this._flow
// which we can tolerate as they are the same object.
Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true })
this._asyncDelivery = n._flow.asyncMessageDelivery;
}
if (this._asyncDelivery === undefined) {
this._asyncDelivery = true;
}
this.updateWires(n.wires);
}
@@ -79,17 +85,95 @@ Node.prototype.context = function() {
return this._context;
}
Node.prototype._on = Node.prototype.on;
Node.prototype._complete = function(msg,error) {
if (error) {
// For now, delegate this to this.error
// But at some point, the timeout handling will need to know about
// this as well.
this.error(error,msg);
} else {
this._flow.handleComplete(this,msg);
}
}
Node.prototype._on = Node.prototype.on;
Node.prototype.on = function(event, callback) {
var node = this;
if (event == "close") {
this._closeCallbacks.push(callback);
} else if (event === "input") {
if (this._inputCallback) {
this._inputCallbacks = [this._inputCallback, callback];
this._inputCallback = null;
} else if (this._inputCallbacks) {
this._inputCallbacks.push(callback);
} else {
this._inputCallback = callback;
}
} else {
this._on(event, callback);
}
};
Node.prototype._emit = Node.prototype.emit;
Node.prototype.emit = function(event,arg) {
var node = this;
if (event === "input") {
// When Pluggable Message Routing arrives, this will be called from
// that and will already be sync/async depending on the router.
if (this._asyncDelivery) {
setImmediate(function() {
node._emitInput(arg);
});
} else {
this._emitInput(arg);
}
} else {
this._emit(event,arg);
}
}
Node.prototype._emitInput = function(arg) {
var node = this;
if (node._inputCallback) {
try {
node._inputCallback(arg,function(err) { node._complete(arg,err); });
} catch(err) {
node.error(err,arg);
}
} else if (node._inputCallbacks) {
var c = node._inputCallbacks.length;
for (var i=0;i<c;i++) {
var cb = node._inputCallbacks[i];
if (cb.length === 2) {
c++;
}
try {
node._inputCallbacks[i](arg,function(err) {
c--;
if (c === 0) {
node._complete(arg,err);
}
});
} catch(err) {
node.error(err,msg);
}
}
}
}
Node.prototype._removeAllListeners = Node.prototype.removeAllListeners;
Node.prototype.removeAllListeners = function(name) {
if (name === "input") {
this._inputCallback = null;
this._inputCallbacks = null;
} else if (name === "close") {
this._closeCallbacks = [];
} else {
this._removeAllListeners(name);
}
}
Node.prototype.close = function(removed) {
//console.log(this.type,this.id,removed);
var promises = [];
@@ -233,11 +317,7 @@ Node.prototype.receive = function(msg) {
msg._msgid = redUtil.generateId();
}
this.metric("receive",msg);
try {
this.emit("input", msg);
} catch(err) {
this.error(err,msg);
}
this.emit("input",msg);
};
function log_helper(self, level, msg) {

View File

@@ -23,6 +23,7 @@ var Subflow;
var Log;
var nodeCloseTimeout = 15000;
var asyncMessageDelivery = true;
/**
* This class represents a flow within the runtime. It is responsible for
@@ -125,6 +126,7 @@ class Flow {
var id;
this.catchNodes = [];
this.statusNodes = [];
this.completeNodeMap = {};
var configNodes = Object.keys(this.flow.configs);
var configNodeAttempts = {};
@@ -228,7 +230,7 @@ class Flow {
this.trace(" id | type | alias");
this.trace("------------------|--------------|-----------------");
}
// Build the map of catch/status nodes.
// Build the map of catch/status/complete nodes.
for (id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
node = this.activeNodes[id];
@@ -237,6 +239,13 @@ class Flow {
this.catchNodes.push(node);
} else if (node.type === "status") {
this.statusNodes.push(node);
} else if (node.type === "complete") {
if (node.scope) {
node.scope.forEach(id => {
this.completeNodeMap[id] = this.completeNodeMap[id] || [];
this.completeNodeMap[id].push(node);
})
}
}
}
}
@@ -516,6 +525,20 @@ class Flow {
return handled;
}
handleComplete(node,msg) {
if (this.completeNodeMap[node.id]) {
let toSend = msg;
this.completeNodeMap[node.id].forEach((completeNode,index) => {
toSend = redUtil.cloneMessage(msg);
completeNode.receive(toSend);
})
}
}
get asyncMessageDelivery() {
return asyncMessageDelivery
}
dump() {
console.log("==================")
console.log(this.TYPE, this.id);
@@ -562,6 +585,7 @@ function stopNode(node,removed) {
module.exports = {
init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery
Log = runtime.log;
Subflow = require("./Subflow");
Subflow.init(runtime);

View File

@@ -729,6 +729,10 @@ module.exports = {
updateFlow: updateFlow,
removeFlow: removeFlow,
disableFlow:null,
enableFlow:null
enableFlow:null,
isDeliveryModeAsync: function() {
// If settings is null, this is likely being run by unit tests
return !settings || !settings.runtimeSyncDelivery
}
};