1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Fix MQTT client reconnect logic

This commit is contained in:
Nick O'Leary 2014-04-21 20:40:56 +01:00
parent 775297d625
commit 0b7fa1ab5c
2 changed files with 136 additions and 135 deletions

View File

@ -45,141 +45,143 @@ function MQTTClient(port,host) {
util.inherits(MQTTClient, events.EventEmitter); util.inherits(MQTTClient, events.EventEmitter);
MQTTClient.prototype.connect = function(options) { MQTTClient.prototype.connect = function(options) {
var self = this; if (!this.connected) {
options = options||{}; var self = this;
self.options = options; options = options||{};
self.options.keepalive = options.keepalive||15; self.options = options;
self.options.clean = self.options.clean||true; self.options.keepalive = options.keepalive||15;
self.options.protocolId = 'MQIsdp'; self.options.clean = self.options.clean||true;
self.options.protocolVersion = 3; self.options.protocolId = 'MQIsdp';
self.options.protocolVersion = 3;
self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
if (err) { self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
self.connected = false; if (err) {
clearInterval(self.watchdog); self.connected = false;
self.connectionError = true; clearInterval(self.watchdog);
//util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); self.connectionError = true;
self.emit('connectionlost',err); //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
return; self.emit('connectionlost',err);
} return;
client.on('close',function(e) { }
//util.log('[mqtt] ['+self.uid+'] on close'); client.on('close',function(e) {
clearInterval(self.watchdog); //util.log('[mqtt] ['+self.uid+'] on close');
if (!self.connectionError) { clearInterval(self.watchdog);
if (!self.connectionError) {
if (self.connected) {
self.connected = false;
self.emit('connectionlost',e);
} else {
self.emit('disconnect');
}
}
});
client.on('error',function(e) {
//util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
clearInterval(self.watchdog);
if (self.connected) { if (self.connected) {
self.connected = false; self.connected = false;
self.emit('connectionlost',e); self.emit('connectionlost',e);
} else {
self.emit('disconnect');
} }
} });
}); client.on('connack',function(packet) {
client.on('error',function(e) { if (packet.returnCode == 0) {
//util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); self.watchdog = setInterval(function(self) {
clearInterval(self.watchdog); var now = (new Date()).getTime();
if (self.connected) {
self.connected = false; //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
self.emit('connectionlost',e);
} if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
}); if (self.pingOutstanding) {
client.on('connack',function(packet) { //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
if (packet.returnCode == 0) { try {
self.watchdog = setInterval(function(self) { self.client.disconnect();
var now = (new Date()).getTime(); } catch (err) {
}
//util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); } else {
//util.log('[mqtt] ['+self.uid+'] watchdog pinging');
if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { self.lastOutbound = (new Date()).getTime();
if (self.pingOutstanding) { self.lastInbound = (new Date()).getTime();
//util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); self.pingOutstanding = true;
try { self.client.pingreq();
self.client.disconnect(); }
} catch (err) { }
}
} else { },self.options.keepalive*500,self);
//util.log('[mqtt] ['+self.uid+'] watchdog pinging'); self.pingOutstanding = false;
self.lastOutbound = (new Date()).getTime(); self.lastInbound = (new Date()).getTime()
self.lastInbound = (new Date()).getTime(); self.lastOutbound = (new Date()).getTime()
self.pingOutstanding = true; self.connected = true;
self.client.pingreq(); self.connectionError = false;
} self.emit('connect');
} } else {
self.connected = false;
},self.options.keepalive*500,self); self.emit('connectionlost');
self.pingOutstanding = false; }
self.lastInbound = (new Date()).getTime() });
self.lastOutbound = (new Date()).getTime() client.on('suback',function(packet) {
self.connected = true; self.lastInbound = (new Date()).getTime()
self.connectionError = false; var topic = self.pendingSubscriptions[packet.messageId];
self.emit('connect'); self.emit('subscribe',topic,packet.granted[0]);
} else { delete self.pendingSubscriptions[packet.messageId];
self.connected = false; });
self.emit('connectionlost'); client.on('unsuback',function(packet) {
} self.lastInbound = (new Date()).getTime()
}); var topic = self.pendingSubscriptions[packet.messageId];
client.on('suback',function(packet) { self.emit('unsubscribe',topic,packet.granted[0]);
self.lastInbound = (new Date()).getTime() delete self.pendingSubscriptions[packet.messageId];
var topic = self.pendingSubscriptions[packet.messageId]; });
self.emit('subscribe',topic,packet.granted[0]); client.on('publish',function(packet) {
delete self.pendingSubscriptions[packet.messageId]; self.lastInbound = (new Date()).getTime()
}); if (packet.qos < 2) {
client.on('unsuback',function(packet) { var p = packet;
self.lastInbound = (new Date()).getTime() self.emit('message',p.topic,p.payload,p.qos,p.retain);
var topic = self.pendingSubscriptions[packet.messageId]; } else {
self.emit('unsubscribe',topic,packet.granted[0]); self.inboundMessages[packet.messageId] = packet;
delete self.pendingSubscriptions[packet.messageId]; this.lastOutbound = (new Date()).getTime()
}); self.client.pubrec(packet);
client.on('publish',function(packet) { }
self.lastInbound = (new Date()).getTime() if (packet.qos == 1) {
if (packet.qos < 2) { this.lastOutbound = (new Date()).getTime()
var p = packet; self.client.puback(packet);
self.emit('message',p.topic,p.payload,p.qos,p.retain); }
} else { });
self.inboundMessages[packet.messageId] = packet;
this.lastOutbound = (new Date()).getTime() client.on('pubrel',function(packet) {
self.client.pubrec(packet); self.lastInbound = (new Date()).getTime()
} var p = self.inboundMessages[packet.messageId];
if (packet.qos == 1) { if (p) {
this.lastOutbound = (new Date()).getTime() self.emit('message',p.topic,p.payload,p.qos,p.retain);
self.client.puback(packet); delete self.inboundMessages[packet.messageId];
} }
}); self.lastOutbound = (new Date()).getTime()
self.client.pubcomp(packet);
client.on('pubrel',function(packet) { });
self.lastInbound = (new Date()).getTime()
var p = self.inboundMessages[packet.messageId]; client.on('puback',function(packet) {
if (p) { self.lastInbound = (new Date()).getTime()
self.emit('message',p.topic,p.payload,p.qos,p.retain); // outbound qos-1 complete
delete self.inboundMessages[packet.messageId]; });
}
self.lastOutbound = (new Date()).getTime() client.on('pubrec',function(packet) {
self.client.pubcomp(packet); self.lastInbound = (new Date()).getTime()
}); self.lastOutbound = (new Date()).getTime()
self.client.pubrel(packet);
client.on('puback',function(packet) { });
self.lastInbound = (new Date()).getTime() client.on('pubcomp',function(packet) {
// outbound qos-1 complete self.lastInbound = (new Date()).getTime()
}); // outbound qos-2 complete
});
client.on('pubrec',function(packet) { client.on('pingresp',function(packet) {
self.lastInbound = (new Date()).getTime() //util.log('[mqtt] ['+self.uid+'] received pingresp');
self.lastOutbound = (new Date()).getTime() self.lastInbound = (new Date()).getTime()
self.client.pubrel(packet); self.pingOutstanding = false;
}); });
client.on('pubcomp',function(packet) {
self.lastInbound = (new Date()).getTime() this.lastOutbound = (new Date()).getTime()
// outbound qos-2 complete this.connectionError = false;
}); client.connect(self.options);
client.on('pingresp',function(packet) { });
//util.log('[mqtt] ['+self.uid+'] received pingresp'); }
self.lastInbound = (new Date()).getTime()
self.pingOutstanding = false;
});
this.lastOutbound = (new Date()).getTime()
this.connectionError = false;
client.connect(self.options);
});
} }
MQTTClient.prototype.subscribe = function(topic,qos) { MQTTClient.prototype.subscribe = function(topic,qos) {

View File

@ -91,7 +91,6 @@ module.exports = {
client.on('connect',function() { client.on('connect',function() {
if (client) { if (client) {
util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port); util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
connecting = false; connecting = false;
for (var s in subscriptions) { for (var s in subscriptions) {
var topic = subscriptions[s].topic; var topic = subscriptions[s].topic;
@ -109,13 +108,13 @@ module.exports = {
}); });
client.on('connectionlost', function(err) { client.on('connectionlost', function(err) {
util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port); util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
connecting = false;
setTimeout(function() { setTimeout(function() {
if (client) { obj.connect();
client.connect(options);
}
}, settings.mqttReconnectTime||5000); }, settings.mqttReconnectTime||5000);
}); });
client.on('disconnect', function() { client.on('disconnect', function() {
connecting = false;
util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port); util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
}); });