mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
Track which event handlers we add to the mqtt client so we can removed them cleanly
This commit is contained in:
parent
3be75fa822
commit
02d9ad479d
@ -84,8 +84,8 @@
|
|||||||
"bcrypt": "5.0.1"
|
"bcrypt": "5.0.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"dompurify": "2.3.5",
|
"dompurify": "2.3.6",
|
||||||
"grunt": "1.4.1",
|
"grunt": "^1.5.2",
|
||||||
"grunt-chmod": "~1.1.1",
|
"grunt-chmod": "~1.1.1",
|
||||||
"grunt-cli": "~1.4.3",
|
"grunt-cli": "~1.4.3",
|
||||||
"grunt-concurrent": "3.0.0",
|
"grunt-concurrent": "3.0.0",
|
||||||
|
@ -391,6 +391,7 @@ module.exports = function(RED) {
|
|||||||
node.options = {};
|
node.options = {};
|
||||||
node.queue = [];
|
node.queue = [];
|
||||||
node.subscriptions = {};
|
node.subscriptions = {};
|
||||||
|
node.clientListeners = []
|
||||||
/** @type {mqtt.MqttClient}*/ this.client;
|
/** @type {mqtt.MqttClient}*/ this.client;
|
||||||
node.setOptions = function(opts, init) {
|
node.setOptions = function(opts, init) {
|
||||||
if(!opts || typeof opts !== "object") {
|
if(!opts || typeof opts !== "object") {
|
||||||
@ -658,9 +659,9 @@ module.exports = function(RED) {
|
|||||||
node.serverProperties = {};
|
node.serverProperties = {};
|
||||||
node.client = mqtt.connect(node.brokerurl, node.options);
|
node.client = mqtt.connect(node.brokerurl, node.options);
|
||||||
node.client.setMaxListeners(0);
|
node.client.setMaxListeners(0);
|
||||||
let callbackDone = false; //prevent re-connects causing node.client.on('connect' firing callback multiple times
|
let callbackDone = false; //prevent re-connects causing node._clientOn('connect' firing callback multiple times
|
||||||
// Register successful connect or reconnect handler
|
// Register successful connect or reconnect handler
|
||||||
node.client.on('connect', function (connack) {
|
node._clientOn('connect', function (connack) {
|
||||||
node.closing = false;
|
node.closing = false;
|
||||||
node.connecting = false;
|
node.connecting = false;
|
||||||
node.connected = true;
|
node.connected = true;
|
||||||
@ -692,7 +693,7 @@ module.exports = function(RED) {
|
|||||||
}
|
}
|
||||||
setStatusConnected(node, true);
|
setStatusConnected(node, true);
|
||||||
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
|
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
|
||||||
node.client.removeAllListeners('message');
|
node._clientRemoveAllListeners('message');
|
||||||
|
|
||||||
// Re-subscribe to stored topics
|
// Re-subscribe to stored topics
|
||||||
for (var s in node.subscriptions) {
|
for (var s in node.subscriptions) {
|
||||||
@ -704,7 +705,7 @@ module.exports = function(RED) {
|
|||||||
if (node.subscriptions[s].hasOwnProperty(r)) {
|
if (node.subscriptions[s].hasOwnProperty(r)) {
|
||||||
qos = Math.max(qos,node.subscriptions[s][r].qos);
|
qos = Math.max(qos,node.subscriptions[s][r].qos);
|
||||||
_options = node.subscriptions[s][r].options;
|
_options = node.subscriptions[s][r].options;
|
||||||
node.client.on('message',node.subscriptions[s][r].handler);
|
node._clientOn('message',node.subscriptions[s][r].handler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_options.qos = _options.qos || qos;
|
_options.qos = _options.qos || qos;
|
||||||
@ -717,11 +718,11 @@ module.exports = function(RED) {
|
|||||||
node.publish(node.birthMessage);
|
node.publish(node.birthMessage);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
node.client.on("reconnect", function() {
|
node._clientOn("reconnect", function() {
|
||||||
setStatusConnecting(node, true);
|
setStatusConnecting(node, true);
|
||||||
});
|
});
|
||||||
//Broker Disconnect - V5 event
|
//Broker Disconnect - V5 event
|
||||||
node.client.on("disconnect", function(packet) {
|
node._clientOn("disconnect", function(packet) {
|
||||||
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
|
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
|
||||||
const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;
|
const rc = (packet && packet.properties && packet.reasonCode) || packet.reasonCode;
|
||||||
const rs = packet && packet.properties && packet.properties.reasonString || "";
|
const rs = packet && packet.properties && packet.properties.reasonString || "";
|
||||||
@ -735,7 +736,7 @@ module.exports = function(RED) {
|
|||||||
setStatusDisconnected(node, true);
|
setStatusDisconnected(node, true);
|
||||||
});
|
});
|
||||||
// Register disconnect handlers
|
// Register disconnect handlers
|
||||||
node.client.on('close', function () {
|
node._clientOn('close', function () {
|
||||||
if (node.connected) {
|
if (node.connected) {
|
||||||
node.connected = false;
|
node.connected = false;
|
||||||
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
||||||
@ -747,7 +748,7 @@ module.exports = function(RED) {
|
|||||||
|
|
||||||
// Register connect error handler
|
// Register connect error handler
|
||||||
// The client's own reconnect logic will take care of errors
|
// The client's own reconnect logic will take care of errors
|
||||||
node.client.on('error', function (error) {
|
node._clientOn('error', function (error) {
|
||||||
});
|
});
|
||||||
}catch(err) {
|
}catch(err) {
|
||||||
console.log(err);
|
console.log(err);
|
||||||
@ -763,6 +764,9 @@ module.exports = function(RED) {
|
|||||||
node.connecting = false;
|
node.connecting = false;
|
||||||
node.connected = false;
|
node.connected = false;
|
||||||
}
|
}
|
||||||
|
if(node.client) { node._clientRemoveAllListeners(); }
|
||||||
|
node.connecting = false;
|
||||||
|
node.connected = false;
|
||||||
callback && typeof callback == "function" && callback();
|
callback && typeof callback == "function" && callback();
|
||||||
};
|
};
|
||||||
if(!node.client) { return _callback(); }
|
if(!node.client) { return _callback(); }
|
||||||
@ -775,7 +779,7 @@ module.exports = function(RED) {
|
|||||||
resolve();
|
resolve();
|
||||||
} else {
|
} else {
|
||||||
const t = setTimeout(reject, ms);
|
const t = setTimeout(reject, ms);
|
||||||
client.end(true, () => {
|
client.end(() => {
|
||||||
clearTimeout(t);
|
clearTimeout(t);
|
||||||
resolve()
|
resolve()
|
||||||
});
|
});
|
||||||
@ -831,7 +835,7 @@ module.exports = function(RED) {
|
|||||||
};
|
};
|
||||||
node.subscriptions[topic][ref] = sub;
|
node.subscriptions[topic][ref] = sub;
|
||||||
if (node.connected) {
|
if (node.connected) {
|
||||||
node.client.on('message',sub.handler);
|
node._clientOn('message',sub.handler);
|
||||||
node.client.subscribe(topic, options);
|
node.client.subscribe(topic, options);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -908,9 +912,32 @@ module.exports = function(RED) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
node.on('close', function(done) {
|
node.on('close', function(done) {
|
||||||
node.closing = true;
|
node.disconnect(function() {
|
||||||
node.disconnect(done);
|
if(node.client) {
|
||||||
|
node._clientRemoveAllListeners();
|
||||||
|
}
|
||||||
|
done();
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Helper functions to track the event listners we add to the
|
||||||
|
// client. The mqtt client also uses it own set of listeners
|
||||||
|
// so we can't use removeAllListeners() wothout breaking it
|
||||||
|
node._clientOn = function(event, f) {
|
||||||
|
node.clientListeners.push({event, f})
|
||||||
|
node.client.on(event, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
node._clientRemoveAllListeners = function(event) {
|
||||||
|
node.clientListeners = node.clientListeners.filter((l) => {
|
||||||
|
if (!event || (event == l.e)) {
|
||||||
|
node.client.removeListener(l.event, l.f)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return true;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user