mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Merge pull request #3654 from Steve-Mcl/mqtt-backport
Mqtt fixes in v3 for v2.x (backports #3563 #3594 #3626 to v2.x)
This commit is contained in:
		@@ -84,8 +84,8 @@
 | 
			
		||||
        "bcrypt": "5.0.1"
 | 
			
		||||
    },
 | 
			
		||||
    "devDependencies": {
 | 
			
		||||
        "dompurify": "2.3.5",
 | 
			
		||||
        "grunt": "1.4.1",
 | 
			
		||||
        "dompurify": "2.3.6",
 | 
			
		||||
        "grunt": "1.5.2",
 | 
			
		||||
        "grunt-chmod": "~1.1.1",
 | 
			
		||||
        "grunt-cli": "~1.4.3",
 | 
			
		||||
        "grunt-concurrent": "3.0.0",
 | 
			
		||||
 
 | 
			
		||||
@@ -442,7 +442,17 @@
 | 
			
		||||
        }
 | 
			
		||||
        return defaultContentType || 'none'
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Test a topic string is valid for publishing
 | 
			
		||||
     * @param {string} topic
 | 
			
		||||
     * @returns `true` if it is a valid topic
 | 
			
		||||
     */
 | 
			
		||||
    function validateMQTTPublishTopic(topic, opts) {
 | 
			
		||||
        if(!topic || topic == "" || !/[\+#\b\f\n\r\t\v\0]/.test(topic)) {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
        return RED._("node-red:mqtt.errors.invalid-topic");
 | 
			
		||||
    }
 | 
			
		||||
    RED.nodes.registerType('mqtt-broker',{
 | 
			
		||||
        category: 'config',
 | 
			
		||||
        defaults: {
 | 
			
		||||
 
 | 
			
		||||
@@ -68,12 +68,21 @@ module.exports = function(RED) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Test a topic string is valid
 | 
			
		||||
     * Test a topic string is valid for subscription
 | 
			
		||||
     * @param {string} topic
 | 
			
		||||
     * @returns `true` if it is a valid topic
 | 
			
		||||
     */
 | 
			
		||||
    function isValidSubscriptionTopic(topic) {
 | 
			
		||||
        return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic)
 | 
			
		||||
        return /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(topic);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Test a topic string is valid for publishing
 | 
			
		||||
     * @param {string} topic
 | 
			
		||||
     * @returns `true` if it is a valid topic
 | 
			
		||||
     */
 | 
			
		||||
    function isValidPublishTopic(topic) {
 | 
			
		||||
        return !/[\+#\b\f\n\r\t\v\0]/.test(topic);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@@ -288,7 +297,7 @@ module.exports = function(RED) {
 | 
			
		||||
                        //TODO: delete msg.responseTopic - to prevent it being resent?
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                topicOK = topicOK && !/[\+#\b\f\n\r\t\v\0]/.test(msg.topic);
 | 
			
		||||
                topicOK = topicOK && isValidPublishTopic(msg.topic);
 | 
			
		||||
 | 
			
		||||
                if (topicOK) {
 | 
			
		||||
                    node.brokerConn.publish(msg, done); // send the message
 | 
			
		||||
@@ -362,7 +371,7 @@ module.exports = function(RED) {
 | 
			
		||||
                    node.brokerConn.connect(function () {
 | 
			
		||||
                        done();
 | 
			
		||||
                    });
 | 
			
		||||
                }, true)
 | 
			
		||||
                })
 | 
			
		||||
            } else {
 | 
			
		||||
                // Without force flag, we will refuse to cycle an active connection
 | 
			
		||||
                done(new Error(RED._('mqtt.errors.invalid-action-alreadyconnected')));
 | 
			
		||||
@@ -391,6 +400,7 @@ module.exports = function(RED) {
 | 
			
		||||
        node.options = {};
 | 
			
		||||
        node.queue = [];
 | 
			
		||||
        node.subscriptions = {};
 | 
			
		||||
        node.clientListeners = []
 | 
			
		||||
        /** @type {mqtt.MqttClient}*/ this.client;
 | 
			
		||||
        node.setOptions = function(opts, init) {
 | 
			
		||||
            if(!opts || typeof opts !== "object") {
 | 
			
		||||
@@ -529,7 +539,7 @@ module.exports = function(RED) {
 | 
			
		||||
                    // Only for ws or wss, check if proxy env var for additional configuration
 | 
			
		||||
                    if (node.brokerurl.indexOf("wss://") > -1 || node.brokerurl.indexOf("ws://") > -1) {
 | 
			
		||||
                        // check if proxy is set in env
 | 
			
		||||
                        let prox, noprox;
 | 
			
		||||
                        let prox, noprox, noproxy;
 | 
			
		||||
                        if (process.env.http_proxy) { prox = process.env.http_proxy; }
 | 
			
		||||
                        if (process.env.HTTP_PROXY) { prox = process.env.HTTP_PROXY; }
 | 
			
		||||
                        if (process.env.no_proxy) { noprox = process.env.no_proxy.split(","); }
 | 
			
		||||
@@ -656,11 +666,16 @@ module.exports = function(RED) {
 | 
			
		||||
                setStatusConnecting(node, true);
 | 
			
		||||
                try {
 | 
			
		||||
                    node.serverProperties = {};
 | 
			
		||||
                    if(node.client) {
 | 
			
		||||
                        //belt and braces to avoid left over clients
 | 
			
		||||
                        node.client.end(true);
 | 
			
		||||
                        node._clientRemoveListeners();
 | 
			
		||||
                    }
 | 
			
		||||
                    node.client = mqtt.connect(node.brokerurl, node.options);
 | 
			
		||||
                    node.client.setMaxListeners(0);
 | 
			
		||||
                    let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
 | 
			
		||||
                    let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times
 | 
			
		||||
                    // Register successful connect or reconnect handler
 | 
			
		||||
                    node.client.on('connect', function (connack) {
 | 
			
		||||
                    node._clientOn('connect', function (connack) {
 | 
			
		||||
                        node.closing = false;
 | 
			
		||||
                        node.connecting = false;
 | 
			
		||||
                        node.connected = true;
 | 
			
		||||
@@ -692,7 +707,7 @@ module.exports = function(RED) {
 | 
			
		||||
                        }
 | 
			
		||||
                        setStatusConnected(node, true);
 | 
			
		||||
                        // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
 | 
			
		||||
                        node.client.removeAllListeners('message');
 | 
			
		||||
                        node._clientRemoveListeners('message');
 | 
			
		||||
 | 
			
		||||
                        // Re-subscribe to stored topics
 | 
			
		||||
                        for (var s in node.subscriptions) {
 | 
			
		||||
@@ -704,7 +719,7 @@ module.exports = function(RED) {
 | 
			
		||||
                                    if (node.subscriptions[s].hasOwnProperty(r)) {
 | 
			
		||||
                                        qos = Math.max(qos,node.subscriptions[s][r].qos);
 | 
			
		||||
                                        _options = node.subscriptions[s][r].options;
 | 
			
		||||
                                        node.client.on('message',node.subscriptions[s][r].handler);
 | 
			
		||||
                                        node._clientOn('message',node.subscriptions[s][r].handler);
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                _options.qos = _options.qos || qos;
 | 
			
		||||
@@ -717,11 +732,11 @@ module.exports = function(RED) {
 | 
			
		||||
                            node.publish(node.birthMessage);
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                    node.client.on("reconnect", function() {
 | 
			
		||||
                    node._clientOn("reconnect", function() {
 | 
			
		||||
                        setStatusConnecting(node, true);
 | 
			
		||||
                    });
 | 
			
		||||
                    //Broker Disconnect - V5 event
 | 
			
		||||
                    node.client.on("disconnect", function(packet) {
 | 
			
		||||
                    node._clientOn("disconnect", function(packet) {
 | 
			
		||||
                        //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
 | 
			
		||||
                        const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;
 | 
			
		||||
                        const rs = packet && packet.properties && packet.properties.reasonString || "";
 | 
			
		||||
@@ -735,7 +750,7 @@ module.exports = function(RED) {
 | 
			
		||||
                        setStatusDisconnected(node, true);
 | 
			
		||||
                    });
 | 
			
		||||
                    // Register disconnect handlers
 | 
			
		||||
                    node.client.on('close', function () {
 | 
			
		||||
                    node._clientOn('close', function () {
 | 
			
		||||
                        if (node.connected) {
 | 
			
		||||
                            node.connected = false;
 | 
			
		||||
                            node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
 | 
			
		||||
@@ -747,42 +762,59 @@ module.exports = function(RED) {
 | 
			
		||||
 | 
			
		||||
                    // Register connect error handler
 | 
			
		||||
                    // The client's own reconnect logic will take care of errors
 | 
			
		||||
                    node.client.on('error', function (error) {
 | 
			
		||||
                    node._clientOn('error', function (error) {
 | 
			
		||||
                    });
 | 
			
		||||
                }catch(err) {
 | 
			
		||||
                    console.log(err);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        node.disconnect = function (callback, force) {
 | 
			
		||||
            const _callback = function (resetNodeConnectedState, _force) {
 | 
			
		||||
                setStatusDisconnected(node, true);
 | 
			
		||||
                if(resetNodeConnectedState || _force) {
 | 
			
		||||
                    node.client.removeAllListeners();
 | 
			
		||||
                    node.closing = true;
 | 
			
		||||
                    node.connecting = false;
 | 
			
		||||
                    node.connected = false;
 | 
			
		||||
 | 
			
		||||
        node.disconnect = function (callback) {
 | 
			
		||||
            const _callback = function () {
 | 
			
		||||
                if(node.connected || node.connecting) {
 | 
			
		||||
                    setStatusDisconnected(node, true);
 | 
			
		||||
                }
 | 
			
		||||
                if(node.client) { node._clientRemoveListeners(); }
 | 
			
		||||
                node.connecting = false;
 | 
			
		||||
                node.connected = false;
 | 
			
		||||
                callback && typeof callback == "function" && callback();
 | 
			
		||||
            };
 | 
			
		||||
            if(node.closing) {
 | 
			
		||||
                return _callback(false, force);
 | 
			
		||||
            }
 | 
			
		||||
            var endCallBack = function endCallBack() {
 | 
			
		||||
            }
 | 
			
		||||
            if(!node.client) { return _callback(); }
 | 
			
		||||
            if(node.closing) { return _callback(); }
 | 
			
		||||
 | 
			
		||||
            let waitEnd = (client, ms) => {
 | 
			
		||||
                return new Promise( (resolve, reject) => {
 | 
			
		||||
                    node.closing = true;
 | 
			
		||||
                    if(!client) { 
 | 
			
		||||
                        resolve();
 | 
			
		||||
                     } else {
 | 
			
		||||
                        const t = setTimeout(() => {
 | 
			
		||||
                            //clean end() has exceeded WAIT_END, lets force end!
 | 
			
		||||
                            client && client.end(true);
 | 
			
		||||
                            reject();
 | 
			
		||||
                        }, ms);
 | 
			
		||||
                        client.end(() => {
 | 
			
		||||
                             clearTimeout(t);
 | 
			
		||||
                             resolve()
 | 
			
		||||
                         });
 | 
			
		||||
                     }
 | 
			
		||||
                });
 | 
			
		||||
            };
 | 
			
		||||
            if(node.connected && node.closeMessage) {
 | 
			
		||||
                node.publish(node.closeMessage, function (err) {
 | 
			
		||||
                    node.client.end(endCallBack);
 | 
			
		||||
                    _callback(true, force);
 | 
			
		||||
                    waitEnd(node.client, 2000).then(() => {
 | 
			
		||||
                        _callback();
 | 
			
		||||
                    }).catch((e) => {
 | 
			
		||||
                        _callback();
 | 
			
		||||
                    })
 | 
			
		||||
                });
 | 
			
		||||
            } else if(node.connected) {
 | 
			
		||||
                node.client.end(endCallBack);
 | 
			
		||||
                _callback(true, force);
 | 
			
		||||
            } else if(node.connecting) {
 | 
			
		||||
                node.client.end();
 | 
			
		||||
                _callback(true, true);
 | 
			
		||||
            } else {
 | 
			
		||||
                _callback(false, force);
 | 
			
		||||
                waitEnd(node.client, 2000).then(() => {
 | 
			
		||||
                    _callback();
 | 
			
		||||
                }).catch((e) => {
 | 
			
		||||
                    _callback();
 | 
			
		||||
                })
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        node.subscriptionIds = {};
 | 
			
		||||
@@ -819,7 +851,7 @@ module.exports = function(RED) {
 | 
			
		||||
            };
 | 
			
		||||
            node.subscriptions[topic][ref] = sub;
 | 
			
		||||
            if (node.connected) {
 | 
			
		||||
                node.client.on('message',sub.handler);
 | 
			
		||||
                node._clientOn('message',sub.handler);
 | 
			
		||||
                node.client.subscribe(topic, options);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
@@ -830,7 +862,7 @@ module.exports = function(RED) {
 | 
			
		||||
            if (sub) {
 | 
			
		||||
                if (sub[ref]) {
 | 
			
		||||
                    if(node.client) {
 | 
			
		||||
                        node.client.removeListener('message',sub[ref].handler);
 | 
			
		||||
                        node._clientRemoveListeners('message',sub[ref].handler);
 | 
			
		||||
                    }
 | 
			
		||||
                    delete sub[ref];
 | 
			
		||||
                }
 | 
			
		||||
@@ -864,8 +896,18 @@ module.exports = function(RED) {
 | 
			
		||||
                    qos: msg.qos || 0,
 | 
			
		||||
                    retain: msg.retain || false
 | 
			
		||||
                };
 | 
			
		||||
                let topicOK = hasProperty(msg, "topic") && (typeof msg.topic === "string") && (isValidPublishTopic(msg.topic));
 | 
			
		||||
                //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
 | 
			
		||||
                if(node.options.protocolVersion == 5) {
 | 
			
		||||
                    const bsp = node.serverProperties || {};
 | 
			
		||||
                    if (msg.userProperties && typeof msg.userProperties !== "object") {
 | 
			
		||||
                        delete msg.userProperties;
 | 
			
		||||
                    }
 | 
			
		||||
                    if (hasProperty(msg, "topicAlias") && !isNaN(Number(msg.topicAlias))) {
 | 
			
		||||
                        msg.topicAlias = parseInt(msg.topicAlias);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        delete msg.topicAlias;
 | 
			
		||||
                    }
 | 
			
		||||
                    options.properties = options.properties || {};
 | 
			
		||||
                    setStrProp(msg, options.properties, "responseTopic");
 | 
			
		||||
                    setBufferProp(msg, options.properties, "correlationData");
 | 
			
		||||
@@ -875,31 +917,75 @@ module.exports = function(RED) {
 | 
			
		||||
                    setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
 | 
			
		||||
                    setBoolProp(msg, options.properties, "payloadFormatIndicator");
 | 
			
		||||
                    //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
 | 
			
		||||
                    if (options.properties.topicAlias) {
 | 
			
		||||
                        if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") {
 | 
			
		||||
 | 
			
		||||
                    //check & sanitise topic
 | 
			
		||||
                    if (topicOK && options.properties.topicAlias) {
 | 
			
		||||
                        let aliasValid = (bsp.topicAliasMaximum && bsp.topicAliasMaximum >= options.properties.topicAlias);
 | 
			
		||||
                        if (!aliasValid) {
 | 
			
		||||
                            done("Invalid topicAlias");
 | 
			
		||||
                            return
 | 
			
		||||
                        }
 | 
			
		||||
                        if (node.topicAliases[options.properties.topicAlias] === msg.topic) {
 | 
			
		||||
                            msg.topic = ""
 | 
			
		||||
                            msg.topic = "";
 | 
			
		||||
                        } else {
 | 
			
		||||
                            node.topicAliases[options.properties.topicAlias] = msg.topic
 | 
			
		||||
                            node.topicAliases[options.properties.topicAlias] = msg.topic;
 | 
			
		||||
                        }
 | 
			
		||||
                    } else if (!msg.topic && options.properties.responseTopic) {
 | 
			
		||||
                        msg.topic = msg.responseTopic;
 | 
			
		||||
                        topicOK = isValidPublishTopic(msg.topic);
 | 
			
		||||
                        delete msg.responseTopic; //prevent responseTopic being resent?
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                node.client.publish(msg.topic, msg.payload, options, function(err) {
 | 
			
		||||
                    done && done(err);
 | 
			
		||||
                    return
 | 
			
		||||
                });
 | 
			
		||||
                if (topicOK) {
 | 
			
		||||
                    node.client.publish(msg.topic, msg.payload, options, function(err) {
 | 
			
		||||
                        done && done(err);
 | 
			
		||||
                        return
 | 
			
		||||
                    });
 | 
			
		||||
                } else {
 | 
			
		||||
                    const error = new Error(RED._("mqtt.errors.invalid-topic"));
 | 
			
		||||
                    error.warn = true;
 | 
			
		||||
                    done(error);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        node.on('close', function(done) {
 | 
			
		||||
            node.closing = true;
 | 
			
		||||
            node.disconnect(done);
 | 
			
		||||
            node.disconnect(function() {
 | 
			
		||||
                done();
 | 
			
		||||
            });
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         * Add event handlers to the MQTT.js client and track them so that
 | 
			
		||||
         * we do not remove any handlers that the MQTT client uses internally.  
 | 
			
		||||
         * Use {@link node._clientRemoveListeners `node._clientRemoveListeners`} to remove handlers
 | 
			
		||||
         * @param {string} event The name of the event
 | 
			
		||||
         * @param {function} handler The handler for this event
 | 
			
		||||
         */
 | 
			
		||||
         node._clientOn = function(event, handler) {
 | 
			
		||||
            node.clientListeners.push({event, handler})
 | 
			
		||||
            node.client.on(event, handler)
 | 
			
		||||
        } 
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         * Remove event handlers from the MQTT.js client & only the events 
 | 
			
		||||
         * that we attached in {@link node._clientOn `node._clientOn`}.  
 | 
			
		||||
         * * If `event` is omitted, then all events matching `handler` are removed
 | 
			
		||||
         * * If `handler` is omitted, then all events named `event` are removed
 | 
			
		||||
         * * If both parameters are omitted, then all events are removed
 | 
			
		||||
         * @param {string} [event] The name of the event (optional)
 | 
			
		||||
         * @param {function} [handler] The handler for this event (optional)
 | 
			
		||||
         */
 | 
			
		||||
         node._clientRemoveListeners = function(event, handler) {
 | 
			
		||||
            node.clientListeners = node.clientListeners.filter((l) => {
 | 
			
		||||
                if (event && event !== l.event) { return true; }
 | 
			
		||||
                if (handler && handler !== l.handler) { return true; }
 | 
			
		||||
                node.client.removeListener(l.event, l.handler)
 | 
			
		||||
                return false; //found and removed, filter out this one
 | 
			
		||||
            })
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
 | 
			
		||||
@@ -1074,6 +1160,7 @@ module.exports = function(RED) {
 | 
			
		||||
                        node.brokerConn.unsubscribe(node.topic,node.id, removed);
 | 
			
		||||
                    }
 | 
			
		||||
                    node.brokerConn.deregister(node, done);
 | 
			
		||||
                    node.brokerConn = null;
 | 
			
		||||
                } else {
 | 
			
		||||
                    done();
 | 
			
		||||
                }
 | 
			
		||||
@@ -1138,6 +1225,7 @@ module.exports = function(RED) {
 | 
			
		||||
            node.on('close', function(done) {
 | 
			
		||||
                if (node.brokerConn) {
 | 
			
		||||
                    node.brokerConn.deregister(node,done);
 | 
			
		||||
                    node.brokerConn = null;
 | 
			
		||||
                } else {
 | 
			
		||||
                    done();
 | 
			
		||||
                }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user