mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Tidy up when usage in Flow and Node
This commit is contained in:
		@@ -16,7 +16,6 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
var util = require("util");
 | 
					var util = require("util");
 | 
				
			||||||
var EventEmitter = require("events").EventEmitter;
 | 
					var EventEmitter = require("events").EventEmitter;
 | 
				
			||||||
var when = require("when");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
var redUtil = require("@node-red/util").util;
 | 
					var redUtil = require("@node-red/util").util;
 | 
				
			||||||
var Log = require("@node-red/util").log; // TODO: separate module
 | 
					var Log = require("@node-red/util").log; // TODO: separate module
 | 
				
			||||||
@@ -92,21 +91,34 @@ Node.prototype.close = function(removed) {
 | 
				
			|||||||
        var callback = this._closeCallbacks[i];
 | 
					        var callback = this._closeCallbacks[i];
 | 
				
			||||||
        if (callback.length > 0) {
 | 
					        if (callback.length > 0) {
 | 
				
			||||||
            promises.push(
 | 
					            promises.push(
 | 
				
			||||||
                when.promise(function(resolve) {
 | 
					                new Promise((resolve) => {
 | 
				
			||||||
                    var args = [];
 | 
					                    try {
 | 
				
			||||||
                    if (callback.length === 2) {
 | 
					                        var args = [];
 | 
				
			||||||
                        args.push(!!removed);
 | 
					                        if (callback.length === 2) {
 | 
				
			||||||
 | 
					                            args.push(!!removed);
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        args.push(() => {
 | 
				
			||||||
 | 
					                            resolve();
 | 
				
			||||||
 | 
					                        });
 | 
				
			||||||
 | 
					                        callback.apply(node, args);
 | 
				
			||||||
 | 
					                    } catch(err) {
 | 
				
			||||||
 | 
					                        // TODO: error thrown in node async close callback
 | 
				
			||||||
 | 
					                        // We've never logged this properly.
 | 
				
			||||||
 | 
					                        resolve();
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    args.push(resolve);
 | 
					 | 
				
			||||||
                    callback.apply(node, args);
 | 
					 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
            );
 | 
					            );
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            callback.call(node);
 | 
					            try {
 | 
				
			||||||
 | 
					                callback.call(node);
 | 
				
			||||||
 | 
					            } catch(err) {
 | 
				
			||||||
 | 
					                // TODO: error thrown in node sync close callback
 | 
				
			||||||
 | 
					                // We've never logged this properly.
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if (promises.length > 0) {
 | 
					    if (promises.length > 0) {
 | 
				
			||||||
        return when.settle(promises).then(function() {
 | 
					        return Promise.all(promises).then(function() {
 | 
				
			||||||
            if (this._context) {
 | 
					            if (this._context) {
 | 
				
			||||||
               return context.delete(this._alias||this.id,this.z);
 | 
					               return context.delete(this._alias||this.id,this.z);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -115,7 +127,7 @@ Node.prototype.close = function(removed) {
 | 
				
			|||||||
        if (this._context) {
 | 
					        if (this._context) {
 | 
				
			||||||
            return context.delete(this._alias||this.id,this.z);
 | 
					            return context.delete(this._alias||this.id,this.z);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        return;
 | 
					        return Promise.resolve();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -138,8 +150,6 @@ Node.prototype.send = function(msg) {
 | 
				
			|||||||
            /* istanbul ignore else */
 | 
					            /* istanbul ignore else */
 | 
				
			||||||
            if (node) {
 | 
					            if (node) {
 | 
				
			||||||
                node.receive(msg);
 | 
					                node.receive(msg);
 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
                console.log("trying to send to a node not on this flow",this._wire);
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            return;
 | 
					            return;
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
@@ -187,8 +197,6 @@ Node.prototype.send = function(msg) {
 | 
				
			|||||||
                                }
 | 
					                                }
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                    } else {
 | 
					 | 
				
			||||||
                        console.log("trying to send to a node not on this flow",this._wire);
 | 
					 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -295,4 +303,5 @@ Node.prototype.metric = function(eventname, msg, metricValue) {
 | 
				
			|||||||
Node.prototype.status = function(status) {
 | 
					Node.prototype.status = function(status) {
 | 
				
			||||||
    this._flow.handleStatus(this,status);
 | 
					    this._flow.handleStatus(this,status);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
module.exports = Node;
 | 
					module.exports = Node;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,14 +14,14 @@
 | 
				
			|||||||
 * limitations under the License.
 | 
					 * limitations under the License.
 | 
				
			||||||
 **/
 | 
					 **/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var when = require("when");
 | 
					 | 
				
			||||||
var clone = require("clone");
 | 
					var clone = require("clone");
 | 
				
			||||||
var Subflow;
 | 
					 | 
				
			||||||
var Log;
 | 
					 | 
				
			||||||
var redUtil = require("@node-red/util").util;
 | 
					var redUtil = require("@node-red/util").util;
 | 
				
			||||||
var flowUtil = require("./util");
 | 
					var flowUtil = require("./util");
 | 
				
			||||||
var events = require("../../events");
 | 
					var events = require("../../events");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Subflow;
 | 
				
			||||||
 | 
					var Log;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var nodeCloseTimeout = 15000;
 | 
					var nodeCloseTimeout = 15000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
@@ -227,55 +227,42 @@ class Flow {
 | 
				
			|||||||
     * @return {[type]}             [description]
 | 
					     * @return {[type]}             [description]
 | 
				
			||||||
     */
 | 
					     */
 | 
				
			||||||
    stop(stopList, removedList) {
 | 
					    stop(stopList, removedList) {
 | 
				
			||||||
        return new Promise((resolve,reject) => {
 | 
					        this.trace("stop "+this.TYPE);
 | 
				
			||||||
            this.trace("stop "+this.TYPE);
 | 
					        var i;
 | 
				
			||||||
            var i;
 | 
					        if (!stopList) {
 | 
				
			||||||
            if (stopList) {
 | 
					            stopList = Object.keys(this.activeNodes);
 | 
				
			||||||
                // for (i=0;i<stopList.length;i++) {
 | 
					        }
 | 
				
			||||||
                //     if (this.subflowInstanceNodes[stopList[i]]) {
 | 
					        // Convert the list to a map to avoid multiple scans of the list
 | 
				
			||||||
                //         console.log("NEED TO STOP A SUBFLOW",stopList[i]);
 | 
					        var removedMap = {};
 | 
				
			||||||
                //         // The first in the list is the instance node we already
 | 
					        removedList = removedList || [];
 | 
				
			||||||
                //         // know about
 | 
					        removedList.forEach(function(id) {
 | 
				
			||||||
                //         // stopList = stopList.concat(this.subflowInstanceNodes[stopList[i]].slice(1))
 | 
					            removedMap[id] = true;
 | 
				
			||||||
                //     }
 | 
					        });
 | 
				
			||||||
                // }
 | 
					 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
                stopList = Object.keys(this.activeNodes);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            // Convert the list to a map to avoid multiple scans of the list
 | 
					 | 
				
			||||||
            var removedMap = {};
 | 
					 | 
				
			||||||
            removedList = removedList || [];
 | 
					 | 
				
			||||||
            removedList.forEach(function(id) {
 | 
					 | 
				
			||||||
                removedMap[id] = true;
 | 
					 | 
				
			||||||
            });
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            var promises = [];
 | 
					        var promises = [];
 | 
				
			||||||
            for (i=0;i<stopList.length;i++) {
 | 
					        for (i=0;i<stopList.length;i++) {
 | 
				
			||||||
                var node = this.activeNodes[stopList[i]];
 | 
					            var node = this.activeNodes[stopList[i]];
 | 
				
			||||||
                if (node) {
 | 
					            if (node) {
 | 
				
			||||||
                    delete this.activeNodes[stopList[i]];
 | 
					                delete this.activeNodes[stopList[i]];
 | 
				
			||||||
                    if (this.subflowInstanceNodes[stopList[i]]) {
 | 
					                if (this.subflowInstanceNodes[stopList[i]]) {
 | 
				
			||||||
                        try {
 | 
					                    try {
 | 
				
			||||||
                            var subflow = this.subflowInstanceNodes[stopList[i]];
 | 
					                        var subflow = this.subflowInstanceNodes[stopList[i]];
 | 
				
			||||||
                            promises.push(stopNode(node,false).then(() => { subflow.stop() }));
 | 
					                        promises.push(stopNode(node,false).then(() => { subflow.stop() }));
 | 
				
			||||||
                        } catch(err) {
 | 
					                    } catch(err) {
 | 
				
			||||||
                            node.error(err);
 | 
					                        node.error(err);
 | 
				
			||||||
                        }
 | 
					                    }
 | 
				
			||||||
                        delete this.subflowInstanceNodes[stopList[i]];
 | 
					                    delete this.subflowInstanceNodes[stopList[i]];
 | 
				
			||||||
                    } else {
 | 
					                } else {
 | 
				
			||||||
                        try {
 | 
					                    try {
 | 
				
			||||||
                            var removed = removedMap[stopList[i]];
 | 
					                        var removed = removedMap[stopList[i]];
 | 
				
			||||||
                            promises.push(stopNode(node,removed));
 | 
					                        promises.push(stopNode(node,removed).catch(()=>{}));
 | 
				
			||||||
                        } catch(err) {
 | 
					                    } catch(err) {
 | 
				
			||||||
                            node.error(err);
 | 
					                        node.error(err);
 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            when.settle(promises).then(function(results) {
 | 
					        }
 | 
				
			||||||
                resolve();
 | 
					        return Promise.all(promises);
 | 
				
			||||||
            });
 | 
					 | 
				
			||||||
        });
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -455,22 +442,20 @@ class Flow {
 | 
				
			|||||||
 * @return {[type]}         [description]
 | 
					 * @return {[type]}         [description]
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
function stopNode(node,removed) {
 | 
					function stopNode(node,removed) {
 | 
				
			||||||
    return when.promise(function(resolve, reject) {
 | 
					    Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
 | 
				
			||||||
        var start;
 | 
					    const start = Date.now();
 | 
				
			||||||
        when.promise(function(resolve) {
 | 
					    const closePromise = node.close(removed);
 | 
				
			||||||
            Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
 | 
					    const closeTimeout = new Promise((resolve,reject) => {
 | 
				
			||||||
            start = Date.now();
 | 
					        setTimeout(() => {
 | 
				
			||||||
            resolve(node.close(removed));
 | 
					            reject("Close timed out");
 | 
				
			||||||
        }).timeout(nodeCloseTimeout).then(function(){
 | 
					        }, nodeCloseTimeout);
 | 
				
			||||||
            var delta = Date.now() - start;
 | 
					    });
 | 
				
			||||||
            Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
 | 
					    return Promise.race([closePromise,closeTimeout]).then(() => {
 | 
				
			||||||
            resolve(delta);
 | 
					        var delta = Date.now() - start;
 | 
				
			||||||
        },function(err) {
 | 
					        Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
 | 
				
			||||||
            var delta = Date.now() - start;
 | 
					    }).catch(err => {
 | 
				
			||||||
            node.error(Log._("nodes.flows.stopping-error",{message:err}));
 | 
					        node.error(Log._("nodes.flows.stopping-error",{message:err}));
 | 
				
			||||||
            Log.debug(err.stack);
 | 
					        Log.debug(err.stack);
 | 
				
			||||||
            reject(err);
 | 
					 | 
				
			||||||
        });
 | 
					 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -357,7 +357,7 @@ function start(type,diff,muteLog) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
function stop(type,diff,muteLog) {
 | 
					function stop(type,diff,muteLog) {
 | 
				
			||||||
    if (!started) {
 | 
					    if (!started) {
 | 
				
			||||||
        return when.resolve();
 | 
					        return Promise.resolve();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    type = type||"full";
 | 
					    type = type||"full";
 | 
				
			||||||
    diff = diff||{
 | 
					    diff = diff||{
 | 
				
			||||||
@@ -395,29 +395,27 @@ function stop(type,diff,muteLog) {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return when.promise(function(resolve,reject) {
 | 
					    return Promise.resolve(promises).then(function() {
 | 
				
			||||||
        when.settle(promises).then(function() {
 | 
					        for (id in activeNodesToFlow) {
 | 
				
			||||||
            for (id in activeNodesToFlow) {
 | 
					            if (activeNodesToFlow.hasOwnProperty(id)) {
 | 
				
			||||||
                if (activeNodesToFlow.hasOwnProperty(id)) {
 | 
					                if (!activeFlows[activeNodesToFlow[id]]) {
 | 
				
			||||||
                    if (!activeFlows[activeNodesToFlow[id]]) {
 | 
					 | 
				
			||||||
                        delete activeNodesToFlow[id];
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            if (stopList) {
 | 
					 | 
				
			||||||
                stopList.forEach(function(id) {
 | 
					 | 
				
			||||||
                    delete activeNodesToFlow[id];
 | 
					                    delete activeNodesToFlow[id];
 | 
				
			||||||
                });
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            if (!muteLog) {
 | 
					 | 
				
			||||||
                if (type !== "full") {
 | 
					 | 
				
			||||||
                    log.info(log._("nodes.flows.stopped-modified-"+type));
 | 
					 | 
				
			||||||
                } else {
 | 
					 | 
				
			||||||
                    log.info(log._("nodes.flows.stopped-flows"));
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            resolve();
 | 
					        }
 | 
				
			||||||
        });
 | 
					        if (stopList) {
 | 
				
			||||||
 | 
					            stopList.forEach(function(id) {
 | 
				
			||||||
 | 
					                delete activeNodesToFlow[id];
 | 
				
			||||||
 | 
					            });
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if (!muteLog) {
 | 
				
			||||||
 | 
					            if (type !== "full") {
 | 
				
			||||||
 | 
					                log.info(log._("nodes.flows.stopped-modified-"+type));
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                log.info(log._("nodes.flows.stopped-flows"));
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        events.emit("nodes-stopped");
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -55,8 +55,7 @@ describe('Node', function() {
 | 
				
			|||||||
            n.on('close',function() {
 | 
					            n.on('close',function() {
 | 
				
			||||||
                done();
 | 
					                done();
 | 
				
			||||||
            });
 | 
					            });
 | 
				
			||||||
            var p = n.close();
 | 
					            n.close();
 | 
				
			||||||
            should.not.exist(p);
 | 
					 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        it('returns a promise when provided a callback with a done parameter',function(testdone) {
 | 
					        it('returns a promise when provided a callback with a done parameter',function(testdone) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -346,7 +346,7 @@ describe('Flow', function() {
 | 
				
			|||||||
                stoppedNodes.should.have.a.property("2");
 | 
					                stoppedNodes.should.have.a.property("2");
 | 
				
			||||||
                stoppedNodes.should.have.a.property("3");
 | 
					                stoppedNodes.should.have.a.property("3");
 | 
				
			||||||
                done();
 | 
					                done();
 | 
				
			||||||
            });
 | 
					            }).catch(done);
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        it("stops specified nodes",function(done) {
 | 
					        it("stops specified nodes",function(done) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user