diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index b5369a470..5c9194b1a 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -370,7 +370,6 @@ module.exports = function(RED) { node.connecting = true; try { node.serverProperties = {}; - // debug("MQTT: ⬆️ mqtt.connect(node.brokerurl ,node.options)", node.brokerurl, node.options); node.client = mqtt.connect(node.brokerurl ,node.options); node.client.setMaxListeners(0); // Register successful connect or reconnect handler @@ -396,7 +395,7 @@ module.exports = function(RED) { setStrProp(connack.properties, node.serverProperties, "assignedClientIdentifier"); setStrProp(connack.properties, node.serverProperties, "reasonString"); setUserProperties(connack.properties, node.serverProperties); - debug("MQTTBrokerNode: ⬆ CONNECTED. node.serverProperties ==> ", node.serverProperties ); + debug("MQTTBrokerNode: ⬆ CONNECTED. node.serverProperties ==> ", node.serverProperties );//TODO: remove } } for (var id in node.users) { @@ -417,12 +416,12 @@ module.exports = function(RED) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); _options = node.subscriptions[s][r].options; - debug(`MQTTBrokerNode:${node.id}: Re-subscribe - registering handler ref ${r} for ${s} `); + debug(`MQTTBrokerNode:${node.id}: Re-subscribe - registering handler ref ${r} for ${s} `);//TODO: remove node.client.on('message',node.subscriptions[s][r].handler); } } _options.qos = _options.qos || qos; - debug(`MQTTBrokerNode:${node.id}: Re-subscribe - subscribing to topic '${topic}'`, _options); + debug(`MQTTBrokerNode:${node.id}: Re-subscribe - subscribing to topic '${topic}'`, _options);//TODO: remove node.client.subscribe(topic, _options); } } @@ -433,6 +432,7 @@ module.exports = function(RED) { } }); node.client.on("reconnect", function() { + debug('MQTTBrokerNode reconnect event', packet); //TODO: remove for (var id in node.users) { if (node.users.hasOwnProperty(id)) { node.users[id].status({fill:"yellow",shape:"ring",text:"node-red:common.status.connecting"}); @@ -442,13 +442,14 @@ module.exports = function(RED) { //TODO: what to do with this event? Anything? Necessary? node.client.on("disconnect", function(packet) { //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. - console.log('MQTTBrokerNode disconnect', packet) + debug('MQTTBrokerNode disconnect event', packet); //TODO: remove //TODO: remove var rc = packet && packet.properties && packet.properties.reasonString; var rc = packet && packet.properties && packet.reasonCode; + //TODO: If keeping this event, do we use these? log these? }); // Register disconnect handlers node.client.on('close', function () { - console.log('MQTTBrokerNode closed', arguments) + debug('MQTTBrokerNode close event', arguments); //TODO: remove if (node.connected) { node.connected = false; node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); @@ -465,7 +466,7 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors node.client.on('error', function (error) { - console.log('MQTTBrokerNode errored', error) + debug('MQTTBrokerNode error event', error); //TODO: remove }); }catch(err) { console.log(err); @@ -489,21 +490,21 @@ module.exports = function(RED) { qos:qos, options:options, handler:function(mtopic,mpayload, mpacket) { - debug(`MQTTBrokerNode:${node.id}: this.subscribe.handler - attempting to match '${topic}' to '${mtopic}' `, mpacket); + debug(`MQTTBrokerNode:${node.id}: this.subscribe.handler - attempting to match '${topic}' to '${mtopic}' `, mpacket); //TODO: remove if(mpacket.properties && options.properties && mpacket.properties.subscriptionIdentifier && options.properties.subscriptionIdentifier && (mpacket.properties.subscriptionIdentifier !== options.properties.subscriptionIdentifier) ) { //do nothing as subscriptionIdentifier does not match - debug(`MQTTBrokerNode:${node.id}: > no match - this nodes subID (${options.properties.subscriptionIdentifier}) !== packet subID (${mpacket.properties.subscriptionIdentifier})`); + debug(`MQTTBrokerNode:${node.id}: > no match - this nodes subID (${options.properties.subscriptionIdentifier}) !== packet subID (${mpacket.properties.subscriptionIdentifier})`); //TODO: remove } else if (matchTopic(topic,mtopic)) { - debug(`MQTTBrokerNode:${node.id}: > MATCHED '${topic}' to '${mtopic}' - performing callback`); + debug(`MQTTBrokerNode:${node.id}: > MATCHED '${topic}' to '${mtopic}' - performing callback`); //TODO: remove callback(mtopic,mpayload, mpacket); } else - debug(`MQTTBrokerNode:${node.id}: > no match / no callback`); + debug(`MQTTBrokerNode:${node.id}: > no match / no callback`); //TODO: remove }, ref: ref }; node.subscriptions[topic][ref] = sub; if (node.connected) { - debug(`MQTTBrokerNode:${node.id}: this.subscribe - registering handler ref ${ref} for ${topic} and subscribing`, options); + debug(`MQTTBrokerNode:${node.id}: this.subscribe - registering handler ref ${ref} for ${topic} and subscribing`, options); //TODO: remove node.client.on('message',sub.handler); node.client.subscribe(topic, options); } @@ -512,29 +513,31 @@ module.exports = function(RED) { this.unsubscribe = function (topic, ref, removed) { ref = ref||0; var sub = node.subscriptions[topic]; - var _debug = `MQTTBrokerNode ${node.id}: unsubscribe for topic ${topic} called... ` ; + var _debug = `MQTTBrokerNode ${node.id}: unsubscribe for topic ${topic} called... ` ; //TODO: remove if (sub) { - _debug += "sub found. " + _debug += "sub found. " //TODO: remove if (sub[ref]) { - // debug(`MQTTBrokerNode:${node.id}: this.unsubscribe - removing handler ref ${ref} for ${topic} `); + // debug(`MQTTBrokerNode:${node.id}: this.unsubscribe - removing handler ref ${ref} for ${topic} `); //TODO: remove _debug += `removing handler ref ${ref} for ${topic}. ` node.client.removeListener('message',sub[ref].handler); delete sub[ref]; } + //TODO: Review. The `if(removed)` was commented out to always delete and remove subscriptions. + // if we dont then property changes dont get applied and old subs still trigger //if (removed) { + if (Object.keys(sub).length === 0) { delete node.subscriptions[topic]; if (node.connected) { - _debug += `calling client.unsubscribe to remove topic ${topic}` - // debug(`MQTTBrokerNode:${node.id}: this.unsubscribe - calling client.unsubscribe to remove topic ${topic} `); + _debug += `calling client.unsubscribe to remove topic ${topic}` //TODO: remove node.client.unsubscribe(topic); } } //} } else { - _debug += "sub not found! " + _debug += "sub not found! "; //TODO: remove } - debug(_debug); + debug(_debug); //TODO: remove }; @@ -565,7 +568,7 @@ module.exports = function(RED) { setUserProperties(msg.userProperties, options.properties); setBufferProp(msg, options.properties, "correlationData"); } - debug(`MQTTBrokerNode:${node.id}: publish - sending payload to ${msg.topic ? msg.topic : (msg.topicAlias ? 'topicAlias-'+msg.topicAlias : '???') } `, options); + debug(`MQTTBrokerNode:${node.id}: publish - sending payload to ${msg.topic ? msg.topic : (msg.topicAlias ? 'topicAlias-'+msg.topicAlias : '???') } `, options);//TODO: remove node.client.publish(msg.topic, msg.payload, options, function(err) { if(err) node.error(err,msg);//catch errors done && done(); @@ -643,9 +646,9 @@ module.exports = function(RED) { if(node.rap === "true" || node.rap === true) options.rap = true; else if(node.rap === "false" || node.rap === false) options.rap = false; } - // debug("MQTT: ⬅️⬅️ this.brokerConn.subscribe",this.topic,options); + this.brokerConn.subscribe(this.topic,options,function(topic,payload,packet) { - debug(`MQTTInNode:${node.id}: Broker sent ${topic}, datatype ${node.datatype}`, packet); + debug(`MQTTInNode:${node.id}: Broker sent ${topic}, datatype ${node.datatype}`, packet);//TODO: remove if (node.datatype === "buffer") { // payload = payload; } else if (node.datatype === "base64") { @@ -773,7 +776,7 @@ module.exports = function(RED) { } } if (topicOK) { // topic must exist - debug(`MQTTOutNode:${node.id}: sending msg to ${msg.topic}`, msg); + debug(`MQTTOutNode:${node.id}: sending msg to ${msg.topic}`, msg);//TODO: remove this.brokerConn.publish(msg, function(){ let args = arguments; let l = args.length;