mirror of
https://github.com/node-red/node-red.git
synced 2025-12-26 23:26:46 +01:00
Update mqtt node options to include will/cleansession/keepalive
This commit is contained in:
@@ -36,17 +36,17 @@ module.exports = function(RED) {
|
||||
this.broker = n.broker;
|
||||
this.port = n.port;
|
||||
this.clientid = n.clientid;
|
||||
this.secureconn = n.secureconn;
|
||||
this.disablecertauth = n.disablecertauth;
|
||||
this.usetls = n.usetls;
|
||||
this.verifyservercert = n.verifyservercert;
|
||||
this.compatmode = n.compatmode;
|
||||
this.mqttkeepalive = n.mqttkeepalive;
|
||||
this.keepalive = n.keepalive;
|
||||
this.cleansession = n.cleansession;
|
||||
|
||||
// Config node state
|
||||
this.brokerurl = "";
|
||||
this.connected = false;
|
||||
this.connecting = false;
|
||||
this.usecount = 0;
|
||||
this.logdisconnect = true;
|
||||
this.options = {};
|
||||
this.queue = [];
|
||||
this.subscriptions = {};
|
||||
@@ -61,22 +61,27 @@ module.exports = function(RED) {
|
||||
|
||||
// If the config node is missing certain options (it was probably deployed prior to an update to the node code),
|
||||
// select/generate sensible options for the new fields
|
||||
if (typeof this.secureconn === 'undefined'){
|
||||
this.secureconn = false;
|
||||
if (typeof this.usetls === 'undefined'){
|
||||
this.usetls = false;
|
||||
}
|
||||
if (typeof this.compatmode === 'undefined'){
|
||||
this.compatmode = true;
|
||||
}
|
||||
if (typeof this.disablecertauth === 'undefined'){
|
||||
this.disablecertauth = false;
|
||||
if (typeof this.verifyservercert === 'undefined'){
|
||||
this.verifyservercert = false;
|
||||
}
|
||||
if (typeof this.mqttkeepalive === 'undefined'){
|
||||
this.mqttkeepalive = 15;
|
||||
if (typeof this.keepalive === 'undefined'){
|
||||
this.keepalive = 15;
|
||||
} else if (typeof this.keepalive === 'string') {
|
||||
this.keepalive = Number(this.keepalive);
|
||||
}
|
||||
if (typeof this.cleansession === 'undefined') {
|
||||
this.cleansession = true;
|
||||
}
|
||||
|
||||
// Create the URL to pass in to the MQTT.js library
|
||||
if (this.brokerurl == "") {
|
||||
if (this.secureconn) {
|
||||
if (this.usetls) {
|
||||
this.brokerurl="mqtts://";
|
||||
} else {
|
||||
this.brokerurl="mqtt://";
|
||||
@@ -88,22 +93,32 @@ module.exports = function(RED) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.cleansession && !this.clientid) {
|
||||
this.cleansession = true;
|
||||
this.warn(RED._("mqtt.errors.nonclean-missingclientid"));
|
||||
}
|
||||
|
||||
// Build options for passing to the MQTT.js API
|
||||
this.options.clientId = this.clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
|
||||
this.options.username = this.username;
|
||||
this.options.password = this.password;
|
||||
this.options.keepalive = this.mqttkeepalive;
|
||||
this.options.reconnectPeriod = 5000;
|
||||
this.options.keepalive = this.keepalive;
|
||||
this.options.clean = this.clean;
|
||||
this.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000;
|
||||
if (this.compatmode == "true" || this.compatmode === true){
|
||||
this.log('Using compatibility mode for non-MQTT v3.1.1 brokers');
|
||||
this.options.protocolId = 'MQIsdp';
|
||||
this.options.protocolVersion = 3;
|
||||
}
|
||||
if (this.disablecertauth == "true" || this.disablecertauth === true) {
|
||||
this.log(' Warning: Certificate checking is disabled for this connection');
|
||||
this.options.rejectUnauthorized = false;
|
||||
} else {
|
||||
this.options.rejectUnauthorized = true;
|
||||
|
||||
this.options.rejectUnauthorized = (this.verifyservercert == "true" || this.verifyservercert === true)
|
||||
|
||||
if (n.willTopic) {
|
||||
this.options.will = {
|
||||
topic: n.willTopic,
|
||||
payload: n.willPayload || "",
|
||||
qos: Number(n.willQos||0),
|
||||
retain: n.willRetain=="true"|| n.willRetain===true
|
||||
};
|
||||
}
|
||||
|
||||
// Define functions called by MQTT in and out nodes
|
||||
@@ -123,12 +138,10 @@ module.exports = function(RED) {
|
||||
if (!node.connected && !node.connecting) {
|
||||
node.connecting = true;
|
||||
node.client = mqtt.connect(node.brokerurl ,node.options);
|
||||
|
||||
// Register successful connect or reconnect handler
|
||||
node.client.on('connect', function () {
|
||||
node.connected = true;
|
||||
node.logdisconnect = true;
|
||||
node.log("Connected to broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
|
||||
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
||||
node.emit('connected');
|
||||
|
||||
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
|
||||
@@ -156,26 +169,18 @@ module.exports = function(RED) {
|
||||
|
||||
// Register disconnect handlers
|
||||
node.client.on('close', function () {
|
||||
if (node.connected && node.logdisconnect ) {
|
||||
if (node.connected) {
|
||||
node.connected = false;
|
||||
node.logdisconnect = false;
|
||||
node.log("Disconnected from broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
|
||||
node.emit('disconnected');
|
||||
}
|
||||
});
|
||||
|
||||
node.client.on('offline', function () {
|
||||
if (node.connected && node.logdisconnect ) {
|
||||
node.connected = false;
|
||||
node.logdisconnect = false;
|
||||
node.log("Disconnected from broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
|
||||
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
||||
node.emit('disconnected');
|
||||
} else if (node.connecting) {
|
||||
node.log(RED._("mqtt.state.connect-failed",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
||||
}
|
||||
});
|
||||
|
||||
// Register connect error handler
|
||||
node.client.on('error', function (error) {
|
||||
node.log("" + error);
|
||||
console.log("ERROR",error);
|
||||
if (node.connecting) {
|
||||
node.client.end();
|
||||
node.connecting = false;
|
||||
|
||||
Reference in New Issue
Block a user