2013-09-05 16:02:48 +02:00
/ * *
* Copyright 2013 IBM Corp .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* * /
var util = require ( "util" ) ;
var mqtt = require ( "mqtt" ) ;
var events = require ( "events" ) ;
2016-03-16 12:15:30 +01:00
util . log ( "[warn] nodes/core/io/lib/mqtt.js is deprecated and will be removed in a future release of Node-RED. Please report this usage to the Node-RED mailing list." ) ;
2015-10-09 14:36:35 +02:00
//var inspect = require("util").inspect;
2013-09-05 16:02:48 +02:00
//var Client = module.exports.Client = function(
2014-11-04 22:56:15 +01:00
2013-09-05 16:02:48 +02:00
var port = 1883 ;
var host = "localhost" ;
function MQTTClient ( port , host ) {
this . port = port || 1883 ;
this . host = host || "localhost" ;
this . messageId = 1 ;
this . pendingSubscriptions = { } ;
this . inboundMessages = { } ;
this . lastOutbound = ( new Date ( ) ) . getTime ( ) ;
this . lastInbound = ( new Date ( ) ) . getTime ( ) ;
this . connected = false ;
2014-11-04 22:56:15 +01:00
2013-09-05 16:02:48 +02:00
this . _nextMessageId = function ( ) {
this . messageId += 1 ;
if ( this . messageId > 0xFFFF ) {
this . messageId = 1 ;
}
return this . messageId ;
}
events . EventEmitter . call ( this ) ;
}
util . inherits ( MQTTClient , events . EventEmitter ) ;
MQTTClient . prototype . connect = function ( options ) {
2014-04-21 21:40:56 +02:00
if ( ! this . connected ) {
var self = this ;
options = options || { } ;
self . options = options ;
self . options . keepalive = options . keepalive || 15 ;
self . options . clean = self . options . clean || true ;
self . options . protocolId = 'MQIsdp' ;
self . options . protocolVersion = 3 ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
self . client = mqtt . createConnection ( this . port , this . host , function ( err , client ) {
if ( err ) {
self . connected = false ;
clearInterval ( self . watchdog ) ;
self . connectionError = true ;
//util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
self . emit ( 'connectionlost' , err ) ;
return ;
}
client . on ( 'close' , function ( e ) {
//util.log('[mqtt] ['+self.uid+'] on close');
clearInterval ( self . watchdog ) ;
if ( ! self . connectionError ) {
if ( self . connected ) {
self . connected = false ;
self . emit ( 'connectionlost' , e ) ;
} else {
self . emit ( 'disconnect' ) ;
}
}
} ) ;
client . on ( 'error' , function ( e ) {
//util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
clearInterval ( self . watchdog ) ;
2014-02-20 22:53:21 +01:00
if ( self . connected ) {
self . connected = false ;
self . emit ( 'connectionlost' , e ) ;
2014-04-21 21:40:56 +02:00
}
} ) ;
client . on ( 'connack' , function ( packet ) {
2016-07-26 22:33:00 +02:00
if ( packet . returnCode === 0 ) {
2014-04-21 21:40:56 +02:00
self . watchdog = setInterval ( function ( self ) {
var now = ( new Date ( ) ) . getTime ( ) ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
//util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
if ( now - self . lastOutbound > self . options . keepalive * 500 || now - self . lastInbound > self . options . keepalive * 500 ) {
if ( self . pingOutstanding ) {
//util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
try {
self . client . disconnect ( ) ;
} catch ( err ) {
}
} else {
//util.log('[mqtt] ['+self.uid+'] watchdog pinging');
self . lastOutbound = ( new Date ( ) ) . getTime ( ) ;
self . lastInbound = ( new Date ( ) ) . getTime ( ) ;
self . pingOutstanding = true ;
self . client . pingreq ( ) ;
}
}
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
} , self . options . keepalive * 500 , self ) ;
self . pingOutstanding = false ;
self . lastInbound = ( new Date ( ) ) . getTime ( )
self . lastOutbound = ( new Date ( ) ) . getTime ( )
self . connected = true ;
self . connectionError = false ;
self . emit ( 'connect' ) ;
2014-02-20 22:53:21 +01:00
} else {
2014-04-21 21:40:56 +02:00
self . connected = false ;
self . emit ( 'connectionlost' ) ;
2014-02-20 22:53:21 +01:00
}
2014-04-21 21:40:56 +02:00
} ) ;
client . on ( 'suback' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
var topic = self . pendingSubscriptions [ packet . messageId ] ;
self . emit ( 'subscribe' , topic , packet . granted [ 0 ] ) ;
delete self . pendingSubscriptions [ packet . messageId ] ;
} ) ;
client . on ( 'unsuback' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
var topic = self . pendingSubscriptions [ packet . messageId ] ;
2015-04-09 21:10:34 +02:00
self . emit ( 'unsubscribe' , topic ) ;
2014-04-21 21:40:56 +02:00
delete self . pendingSubscriptions [ packet . messageId ] ;
} ) ;
client . on ( 'publish' , function ( packet ) {
2014-11-04 22:56:15 +01:00
self . lastInbound = ( new Date ( ) ) . getTime ( ) ;
2014-04-21 21:40:56 +02:00
if ( packet . qos < 2 ) {
var p = packet ;
self . emit ( 'message' , p . topic , p . payload , p . qos , p . retain ) ;
} else {
self . inboundMessages [ packet . messageId ] = packet ;
this . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . pubrec ( packet ) ;
}
if ( packet . qos == 1 ) {
this . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . puback ( packet ) ;
}
} ) ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
client . on ( 'pubrel' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
var p = self . inboundMessages [ packet . messageId ] ;
if ( p ) {
self . emit ( 'message' , p . topic , p . payload , p . qos , p . retain ) ;
delete self . inboundMessages [ packet . messageId ] ;
}
self . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . pubcomp ( packet ) ;
} ) ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
client . on ( 'puback' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
// outbound qos-1 complete
} ) ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
client . on ( 'pubrec' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
self . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . pubrel ( packet ) ;
} ) ;
client . on ( 'pubcomp' , function ( packet ) {
self . lastInbound = ( new Date ( ) ) . getTime ( )
// outbound qos-2 complete
} ) ;
client . on ( 'pingresp' , function ( packet ) {
//util.log('[mqtt] ['+self.uid+'] received pingresp');
self . lastInbound = ( new Date ( ) ) . getTime ( )
self . pingOutstanding = false ;
} ) ;
2014-11-04 22:56:15 +01:00
2014-04-21 21:40:56 +02:00
this . lastOutbound = ( new Date ( ) ) . getTime ( )
this . connectionError = false ;
client . connect ( self . options ) ;
} ) ;
}
2013-09-05 16:02:48 +02:00
}
MQTTClient . prototype . subscribe = function ( topic , qos ) {
var self = this ;
if ( self . connected ) {
var options = {
subscriptions : [ { topic : topic , qos : qos } ] ,
messageId : self . _nextMessageId ( )
} ;
this . pendingSubscriptions [ options . messageId ] = topic ;
2014-11-04 22:56:15 +01:00
this . lastOutbound = ( new Date ( ) ) . getTime ( ) ;
2013-09-05 16:02:48 +02:00
self . client . subscribe ( options ) ;
2014-11-04 22:56:15 +01:00
self . client . setPacketEncoding ( 'binary' ) ;
2013-09-05 16:02:48 +02:00
}
}
MQTTClient . prototype . unsubscribe = function ( topic ) {
var self = this ;
if ( self . connected ) {
var options = {
2015-04-09 21:10:34 +02:00
unsubscriptions : [ topic ] ,
2013-09-05 16:02:48 +02:00
messageId : self . _nextMessageId ( )
} ;
this . pendingSubscriptions [ options . messageId ] = topic ;
this . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . unsubscribe ( options ) ;
}
}
MQTTClient . prototype . publish = function ( topic , payload , qos , retain ) {
var self = this ;
if ( self . connected ) {
2014-11-04 22:56:15 +01:00
2014-08-18 12:27:52 +02:00
if ( ! Buffer . isBuffer ( payload ) ) {
if ( typeof payload === "object" ) {
payload = JSON . stringify ( payload ) ;
} else if ( typeof payload !== "string" ) {
payload = "" + payload ;
}
2013-09-05 16:02:48 +02:00
}
var options = {
topic : topic ,
payload : payload ,
qos : qos || 0 ,
retain : retain || false
} ;
2016-07-26 22:33:00 +02:00
if ( options . qos !== 0 ) {
2013-09-05 16:02:48 +02:00
options . messageId = self . _nextMessageId ( ) ;
}
this . lastOutbound = ( new Date ( ) ) . getTime ( )
self . client . publish ( options ) ;
}
}
MQTTClient . prototype . disconnect = function ( ) {
var self = this ;
if ( this . connected ) {
this . connected = false ;
2014-02-19 22:30:46 +01:00
try {
this . client . disconnect ( ) ;
} catch ( err ) {
}
2013-09-05 16:02:48 +02:00
}
}
MQTTClient . prototype . isConnected = function ( ) {
return this . connected ;
}
module . exports . createClient = function ( port , host ) {
var mqtt _client = new MQTTClient ( port , host ) ;
return mqtt _client ;
}