2013-09-05 16:02:48 +02:00
/ * *
* Copyright 2013 IBM Corp .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* * /
var util = require ( "util" ) ;
var mqtt = require ( "./mqtt" ) ;
2013-11-14 16:44:54 +01:00
var settings = require ( process . env . NODE _RED _HOME + "/red/red" ) . settings ;
2013-09-05 16:02:48 +02:00
2016-03-16 12:15:30 +01:00
util . log ( "[warn] nodes/core/io/lib/mqttConnectionPool.js is deprecated and will be removed in a future release of Node-RED. Please report this usage to the Node-RED mailing list." ) ;
2013-09-05 16:02:48 +02:00
var connections = { } ;
2014-02-23 21:14:27 +01:00
function matchTopic ( ts , t ) {
if ( ts == "#" ) {
return true ;
}
var re = new RegExp ( "^" + ts . replace ( /([\[\]\?\(\)\\\\$\^\*\.|])/g , "\\$1" ) . replace ( /\+/g , "[^/]+" ) . replace ( /\/#$/ , "(\/.*)?" ) + "$" ) ;
2013-09-05 16:02:48 +02:00
return re . test ( t ) ;
}
module . exports = {
2014-05-29 10:00:28 +02:00
get : function ( broker , port , clientid , username , password , will ) {
2013-11-25 23:50:08 +01:00
var id = "[" + ( username || "" ) + ":" + ( password || "" ) + "][" + ( clientid || "" ) + "]@" + broker + ":" + port ;
2013-09-05 16:02:48 +02:00
if ( ! connections [ id ] ) {
connections [ id ] = function ( ) {
2014-02-19 23:17:20 +01:00
var uid = ( 1 + Math . random ( ) * 4294967295 ) . toString ( 16 ) ;
2013-09-05 16:02:48 +02:00
var client = mqtt . createClient ( port , broker ) ;
2014-04-19 23:17:14 +02:00
client . uid = uid ;
2013-11-13 16:00:55 +01:00
client . setMaxListeners ( 0 ) ;
2013-11-25 23:50:08 +01:00
var options = { keepalive : 15 } ;
options . clientId = clientid || 'mqtt_' + ( 1 + Math . random ( ) * 4294967295 ) . toString ( 16 ) ;
options . username = username ;
options . password = password ;
2014-05-29 10:00:28 +02:00
options . will = will ;
2013-09-05 16:02:48 +02:00
var queue = [ ] ;
2015-04-09 21:10:34 +02:00
var subscriptions = { } ;
2013-09-05 16:02:48 +02:00
var connecting = false ;
var obj = {
_instances : 0 ,
publish : function ( msg ) {
if ( client . isConnected ( ) ) {
client . publish ( msg . topic , msg . payload , msg . qos , msg . retain ) ;
} else {
if ( ! connecting ) {
connecting = true ;
client . connect ( options ) ;
}
queue . push ( msg ) ;
}
} ,
2015-04-09 21:10:34 +02:00
subscribe : function ( topic , qos , callback , ref ) {
ref = ref || 0 ;
subscriptions [ topic ] = subscriptions [ topic ] || { } ;
2016-03-16 12:15:30 +01:00
2015-04-09 21:10:34 +02:00
var sub = {
topic : topic ,
qos : qos ,
handler : function ( mtopic , mpayload , mqos , mretain ) {
2013-09-05 16:02:48 +02:00
if ( matchTopic ( topic , mtopic ) ) {
callback ( mtopic , mpayload , mqos , mretain ) ;
}
2015-04-09 21:10:34 +02:00
} ,
ref : ref
} ;
subscriptions [ topic ] [ ref ] = sub ;
client . on ( 'message' , sub . handler ) ;
2013-09-05 16:02:48 +02:00
if ( client . isConnected ( ) ) {
client . subscribe ( topic , qos ) ;
}
} ,
2015-04-09 21:10:34 +02:00
unsubscribe : function ( topic , ref ) {
ref = ref || 0 ;
var sub = subscriptions [ topic ] ;
if ( sub ) {
if ( sub [ ref ] ) {
client . removeListener ( 'message' , sub [ ref ] . handler ) ;
delete sub [ ref ] ;
}
if ( Object . keys ( sub ) . length == 0 ) {
delete subscriptions [ topic ] ;
client . unsubscribe ( topic ) ;
}
}
} ,
2013-09-05 16:02:48 +02:00
on : function ( a , b ) {
client . on ( a , b ) ;
} ,
once : function ( a , b ) {
client . once ( a , b ) ;
} ,
connect : function ( ) {
2014-04-21 22:14:03 +02:00
if ( client && ! client . isConnected ( ) && ! connecting ) {
2013-09-05 16:02:48 +02:00
connecting = true ;
client . connect ( options ) ;
}
} ,
2015-04-09 21:10:34 +02:00
disconnect : function ( ref ) {
2016-03-16 12:15:30 +01:00
2013-09-05 16:02:48 +02:00
this . _instances -= 1 ;
if ( this . _instances == 0 ) {
client . disconnect ( ) ;
client = null ;
delete connections [ id ] ;
}
2015-04-09 21:10:34 +02:00
} ,
isConnected : function ( ) {
return client . isConnected ( ) ;
2013-09-05 16:02:48 +02:00
}
} ;
client . on ( 'connect' , function ( ) {
2014-01-13 12:27:09 +01:00
if ( client ) {
2014-02-19 23:17:20 +01:00
util . log ( '[mqtt] [' + uid + '] connected to broker tcp://' + broker + ':' + port ) ;
2014-01-13 12:27:09 +01:00
connecting = false ;
for ( var s in subscriptions ) {
2015-04-09 21:10:34 +02:00
var topic = s ;
var qos = 0 ;
for ( var r in subscriptions [ s ] ) {
qos = Math . max ( qos , subscriptions [ s ] [ r ] . qos ) ;
}
2014-01-13 12:27:09 +01:00
client . subscribe ( topic , qos ) ;
}
//console.log("connected - publishing",queue.length,"messages");
while ( queue . length ) {
var msg = queue . shift ( ) ;
//console.log(msg);
client . publish ( msg . topic , msg . payload , msg . qos , msg . retain ) ;
}
2013-09-05 16:02:48 +02:00
}
} ) ;
client . on ( 'connectionlost' , function ( err ) {
2014-02-19 23:17:20 +01:00
util . log ( '[mqtt] [' + uid + '] connection lost to broker tcp://' + broker + ':' + port ) ;
2014-04-21 21:40:56 +02:00
connecting = false ;
2013-09-05 16:02:48 +02:00
setTimeout ( function ( ) {
2014-04-21 21:40:56 +02:00
obj . connect ( ) ;
2013-09-05 16:02:48 +02:00
} , settings . mqttReconnectTime || 5000 ) ;
} ) ;
client . on ( 'disconnect' , function ( ) {
2014-04-21 21:40:56 +02:00
connecting = false ;
2014-02-19 23:17:20 +01:00
util . log ( '[mqtt] [' + uid + '] disconnected from broker tcp://' + broker + ':' + port ) ;
2013-09-05 16:02:48 +02:00
} ) ;
return obj
} ( ) ;
}
connections [ id ] . _instances += 1 ;
return connections [ id ] ;
}
} ;