2013-09-05 16:02:48 +02:00
/ * *
2017-01-11 16:24:33 +01:00
* Copyright JS Foundation and other contributors , http : //js.foundation
2013-09-05 16:02:48 +02:00
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* * /
2014-05-04 00:32:04 +02:00
module . exports = function ( RED ) {
2014-05-29 10:00:28 +02:00
"use strict" ;
2015-06-03 08:54:43 +02:00
var mqtt = require ( "mqtt" ) ;
var util = require ( "util" ) ;
2014-11-04 22:56:15 +01:00
var isUtf8 = require ( 'is-utf8' ) ;
2018-03-01 17:22:39 +01:00
var HttpsProxyAgent = require ( 'https-proxy-agent' ) ;
var url = require ( 'url' ) ;
2014-05-29 10:00:28 +02:00
2015-06-03 08:54:43 +02:00
function matchTopic ( ts , t ) {
if ( ts == "#" ) {
return true ;
}
2018-08-22 13:20:49 +02:00
/ * T h e f o l l o w i n g a l l o w s s h a r e d s u b s c r i p t i o n s ( a s i n M Q T T v 5 )
http : //docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345522
2018-11-08 17:03:41 +01:00
2018-08-22 13:20:49 +02:00
4.8 . 2 describes shares like :
$share / { ShareName } / { filter }
$share is a literal string that marks the Topic Filter as being a Shared Subscription Topic Filter .
{ ShareName } is a character string that does not include "/" , "+" or "#"
{ filter } The remainder of the string has the same syntax and semantics as a Topic Filter in a non - shared subscription . Refer to section 4.7 .
* /
else if ( ts . startsWith ( "$share" ) ) {
ts = ts . replace ( /^\$share\/[^#+/]+\/(.*)/g , "$1" ) ;
2018-11-08 17:03:41 +01:00
2018-08-22 13:20:49 +02:00
}
2015-06-03 08:54:43 +02:00
var re = new RegExp ( "^" + ts . replace ( /([\[\]\?\(\)\\\\$\^\*\.|])/g , "\\$1" ) . replace ( /\+/g , "[^/]+" ) . replace ( /\/#$/ , "(\/.*)?" ) + "$" ) ;
return re . test ( t ) ;
}
2020-10-08 21:24:35 +02:00
/ * *
* Helper function for setting integer property values in the MQTT V5 properties object
* @ param { object } src Source object containing properties
* @ param { object } dst Destination object to set / add properties
* @ param { string } propName The property name to set in the Destination object
* @ param { integer } [ minVal ] The minimum value . If the src value is less than minVal , it will NOT be set in the destination
* @ param { integer } [ maxVal ] The maximum value . If the src value is greater than maxVal , it will NOT be set in the destination
* @ param { integer } [ def ] An optional default to set in the destination object if prop is NOT present in the soruce object
* /
function setIntProp ( src , dst , propName , minVal , maxVal , def ) {
2021-02-25 16:49:56 +01:00
if ( src . hasOwnProperty ( propName ) ) {
var v = parseInt ( src [ propName ] ) ;
2020-10-08 21:24:35 +02:00
if ( isNaN ( v ) ) return ;
if ( minVal != null ) {
if ( v < minVal ) return ;
}
if ( maxVal != null ) {
if ( v > maxVal ) return ;
}
dst [ propName ] = v ;
} else {
if ( def != undefined ) dst [ propName ] = def ;
}
}
2021-02-25 16:49:56 +01:00
2020-10-08 21:24:35 +02:00
/ * *
* Helper function for setting string property values in the MQTT V5 properties object
* @ param { object } src Source object containing properties
* @ param { object } dst Destination object to set / add properties
* @ param { string } propName The property name to set in the Destination object
* @ param { string } [ def ] An optional default to set in the destination object if prop is NOT present in the soruce object
* /
function setStrProp ( src , dst , propName , def ) {
2021-02-25 16:49:56 +01:00
if ( src [ propName ] && typeof src [ propName ] == "string" ) {
2020-10-08 21:24:35 +02:00
dst [ propName ] = src [ propName ] ;
} else {
if ( def != undefined ) dst [ propName ] = def ;
}
}
/ * *
* Helper function for setting boolean property values in the MQTT V5 properties object
* @ param { object } src Source object containing properties
* @ param { object } dst Destination object to set / add properties
* @ param { string } propName The property name to set in the Destination object
* @ param { boolean } [ def ] An optional default to set in the destination object if prop is NOT present in the soruce object
* /
function setBoolProp ( src , dst , propName , def ) {
2021-02-25 16:49:56 +01:00
if ( src [ propName ] != null ) {
2020-10-08 21:24:35 +02:00
if ( src [ propName ] === "true" || src [ propName ] === true ) {
dst [ propName ] = true ;
} else if ( src [ propName ] === "false" || src [ propName ] === false ) {
dst [ propName ] = true ;
}
} else {
if ( def != undefined ) dst [ propName ] = def ;
}
}
/ * *
2021-02-25 16:49:56 +01:00
* Helper function for copying the MQTT v5 srcUserProperties object ( parameter1 ) to the properties object ( parameter2 ) .
2020-10-08 21:24:35 +02:00
* Any property in srcUserProperties that is NOT a key / string pair will be silently discarded .
* NOTE : if no sutable properties are present , the userProperties object will NOT be added to the properties object
* @ param { object } srcUserProperties An object with key / value string pairs
* @ param { object } properties A properties object in which userProperties will be copied to
* /
function setUserProperties ( srcUserProperties , properties ) {
2021-02-25 16:49:56 +01:00
if ( srcUserProperties && typeof srcUserProperties == "object" ) {
2020-10-08 21:24:35 +02:00
let _clone = { } ;
let count = 0 ;
let keys = Object . keys ( srcUserProperties ) ;
if ( ! keys || ! keys . length ) return null ;
keys . forEach ( key => {
let val = srcUserProperties [ key ] ;
if ( typeof val == "string" ) {
count ++ ;
_clone [ key ] = val ;
}
} ) ;
if ( count ) properties . userProperties = _clone ;
}
}
/ * *
2021-02-25 16:49:56 +01:00
* Helper function for copying the MQTT v5 buffer type properties
2020-10-15 00:30:03 +02:00
* NOTE : if src [ propName ] is not a buffer , dst [ propName ] will NOT be assigned a value ( unless def is set )
* @ param { object } src Source object containing properties
* @ param { object } dst Destination object to set / add properties
* @ param { string } propName The property name to set in the Destination object
* @ param { boolean } [ def ] An optional default to set in the destination object if prop is NOT present in the Source object
2020-10-08 21:24:35 +02:00
* /
2020-10-15 00:30:03 +02:00
function setBufferProp ( src , dst , propName , def ) {
if ( ! dst ) return ;
if ( src && dst ) {
var buf = src [ propName ] ;
2021-02-25 16:49:56 +01:00
if ( buf && typeof Buffer . isBuffer ( buf ) ) {
2020-10-15 00:30:03 +02:00
dst [ propName ] = Buffer . from ( buf ) ;
}
} else {
if ( def != undefined ) dst [ propName ] = def ;
2020-10-08 21:24:35 +02:00
}
}
2014-05-04 00:32:04 +02:00
function MQTTBrokerNode ( n ) {
RED . nodes . createNode ( this , n ) ;
2015-06-03 08:54:43 +02:00
// Configuration options passed by Node Red
2014-05-04 00:32:04 +02:00
this . broker = n . broker ;
this . port = n . port ;
this . clientid = n . clientid ;
2015-09-01 23:30:15 +02:00
this . usetls = n . usetls ;
2018-01-11 22:22:02 +01:00
this . usews = n . usews ;
2015-09-01 23:30:15 +02:00
this . verifyservercert = n . verifyservercert ;
2015-06-03 08:54:43 +02:00
this . compatmode = n . compatmode ;
2020-10-08 21:24:35 +02:00
this . protocolVersion = n . protocolVersion ;
2015-09-01 23:30:15 +02:00
this . keepalive = n . keepalive ;
this . cleansession = n . cleansession ;
2021-02-25 16:49:56 +01:00
this . sessionExpiryInterval = n . sessionExpiry ;
2020-10-08 21:24:35 +02:00
this . topicAliasMaximum = n . topicAliasMaximum ;
this . maximumPacketSize = n . maximumPacketSize ;
2020-10-15 00:30:03 +02:00
this . receiveMaximum = n . receiveMaximum ;
this . userProperties = n . userProperties ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
this . userPropertiesType = n . userPropertiesType ;
2015-06-03 08:54:43 +02:00
// Config node state
this . brokerurl = "" ;
this . connected = false ;
this . connecting = false ;
2015-12-23 00:25:29 +01:00
this . closing = false ;
2015-06-03 08:54:43 +02:00
this . options = { } ;
this . queue = [ ] ;
this . subscriptions = { } ;
2015-09-02 12:18:59 +02:00
if ( n . birthTopic ) {
this . birthMessage = {
topic : n . birthTopic ,
payload : n . birthPayload || "" ,
qos : Number ( n . birthQos || 0 ) ,
2020-10-15 00:30:03 +02:00
retain : n . birthRetain == "true" || n . birthRetain === true ,
//TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
2015-09-02 12:18:59 +02:00
} ;
2021-02-25 16:49:56 +01:00
if ( n . birthMsg ) {
setStrProp ( n . birthMsg , this . birthMessage , "contentType" ) ;
if ( n . birthMsg . userProps && /^ *{/ . test ( n . birthMsg . userProps ) ) {
try {
2021-02-25 20:58:59 +01:00
setUserProperties ( JSON . parse ( n . birthMsg . userProps ) , this . birthMessage ) ;
2021-02-25 16:49:56 +01:00
} catch ( err ) { }
}
n . birthMsg . responseTopic = n . birthMsg . respTopic ;
setStrProp ( n . birthMsg , this . birthMessage , "responseTopic" ) ;
n . birthMsg . correlationData = n . birthMsg . correl ;
setBufferProp ( n . birthMsg , this . birthMessage , "correlationData" ) ;
n . birthMsg . messageExpiryInterval = n . birthMsg . expiry
2021-02-25 20:58:59 +01:00
setIntProp ( n . birthMsg , this . birthMessage , "messageExpiryInterval" )
2021-02-25 16:49:56 +01:00
}
2015-09-02 12:18:59 +02:00
}
2015-06-03 08:54:43 +02:00
2018-05-03 21:42:39 +02:00
if ( n . closeTopic ) {
this . closeMessage = {
topic : n . closeTopic ,
payload : n . closePayload || "" ,
qos : Number ( n . closeQos || 0 ) ,
2020-10-15 00:30:03 +02:00
retain : n . closeRetain == "true" || n . closeRetain === true ,
//TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
2018-11-08 17:03:41 +01:00
} ;
2021-02-25 16:49:56 +01:00
if ( n . closeMsg ) {
setStrProp ( n . closeMsg , this . closeMessage , "contentType" ) ;
if ( n . closeMsg . userProps && /^ *{/ . test ( n . closeMsg . userProps ) ) {
try {
2021-02-25 20:58:59 +01:00
setUserProperties ( JSON . parse ( n . closeMsg . userProps ) , this . closeMessage ) ;
2021-02-25 16:49:56 +01:00
} catch ( err ) { }
}
n . closeMsg . responseTopic = n . closeMsg . respTopic ;
setStrProp ( n . closeMsg , this . closeMessage , "responseTopic" ) ;
n . closeMsg . correlationData = n . closeMsg . correl ;
setBufferProp ( n . closeMsg , this . closeMessage , "correlationData" ) ;
n . closeMsg . messageExpiryInterval = n . closeMsg . expiry
2021-02-25 20:58:59 +01:00
setIntProp ( n . birthMsg , this . closeMessage , "messageExpiryInterval" )
2021-02-25 16:49:56 +01:00
}
2018-05-03 21:42:39 +02:00
}
2014-07-18 15:47:58 +02:00
if ( this . credentials ) {
this . username = this . credentials . user ;
this . password = this . credentials . password ;
2014-05-29 10:00:28 +02:00
}
2015-06-03 08:54:43 +02:00
// If the config node is missing certain options (it was probably deployed prior to an update to the node code),
// select/generate sensible options for the new fields
2017-03-10 21:12:52 +01:00
if ( typeof this . usetls === 'undefined' ) {
2015-09-01 23:30:15 +02:00
this . usetls = false ;
2015-06-03 08:54:43 +02:00
}
2018-01-11 22:22:02 +01:00
if ( typeof this . usews === 'undefined' ) {
this . usews = false ;
}
2017-03-10 21:12:52 +01:00
if ( typeof this . verifyservercert === 'undefined' ) {
2015-09-01 23:30:15 +02:00
this . verifyservercert = false ;
2015-06-03 08:54:43 +02:00
}
2017-03-10 21:12:52 +01:00
if ( typeof this . keepalive === 'undefined' ) {
2016-02-19 23:52:43 +01:00
this . keepalive = 60 ;
2015-09-01 23:30:15 +02:00
} else if ( typeof this . keepalive === 'string' ) {
this . keepalive = Number ( this . keepalive ) ;
}
if ( typeof this . cleansession === 'undefined' ) {
this . cleansession = true ;
2015-06-03 08:54:43 +02:00
}
2019-11-03 10:54:23 +01:00
var prox , noprox ;
if ( process . env . http _proxy ) { prox = process . env . http _proxy ; }
if ( process . env . HTTP _PROXY ) { prox = process . env . HTTP _PROXY ; }
if ( process . env . no _proxy ) { noprox = process . env . no _proxy . split ( "," ) ; }
if ( process . env . NO _PROXY ) { noprox = process . env . NO _PROXY . split ( "," ) ; }
2015-06-03 08:54:43 +02:00
// Create the URL to pass in to the MQTT.js library
2016-07-26 22:33:00 +02:00
if ( this . brokerurl === "" ) {
2018-01-11 23:46:18 +01:00
// if the broker may be ws:// or wss:// or even tcp://
2017-04-12 19:31:49 +02:00
if ( this . broker . indexOf ( "://" ) > - 1 ) {
this . brokerurl = this . broker ;
2018-03-01 17:22:39 +01:00
// Only for ws or wss, check if proxy env var for additional configuration
2019-11-03 10:54:23 +01:00
if ( this . brokerurl . indexOf ( "wss://" ) > - 1 || this . brokerurl . indexOf ( "ws://" ) > - 1 ) {
// check if proxy is set in env
var noproxy ;
if ( noprox ) {
2019-11-18 18:14:38 +01:00
for ( var i = 0 ; i < noprox . length ; i += 1 ) {
2019-11-03 10:54:23 +01:00
if ( this . brokerurl . indexOf ( noprox [ i ] . trim ( ) ) !== - 1 ) { noproxy = true ; }
}
}
if ( prox && ! noproxy ) {
2018-03-01 17:22:39 +01:00
var parsedUrl = url . parse ( this . brokerurl ) ;
var proxyOpts = url . parse ( prox ) ;
// true for wss
proxyOpts . secureEndpoint = parsedUrl . protocol ? parsedUrl . protocol === 'wss:' : true ;
// Set Agent for wsOption in MQTT
var agent = new HttpsProxyAgent ( proxyOpts ) ;
this . options . wsOptions = {
agent : agent
}
}
2019-11-03 10:54:23 +01:00
}
2015-06-03 08:54:43 +02:00
} else {
2017-04-12 19:31:49 +02:00
// construct the std mqtt:// url
2018-01-11 22:22:02 +01:00
if ( this . usetls ) {
this . brokerurl = "mqtts://" ;
} else {
2017-04-12 19:31:49 +02:00
this . brokerurl = "mqtt://" ;
}
if ( this . broker !== "" ) {
2020-02-24 22:49:58 +01:00
//Check for an IPv6 address
if ( /(?:^|(?<=\s))(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))(?=\s|$)/ . test ( this . broker ) ) {
this . brokerurl = this . brokerurl + "[" + this . broker + "]:" ;
} else {
this . brokerurl = this . brokerurl + this . broker + ":" ;
}
2017-04-12 19:31:49 +02:00
// port now defaults to 1883 if unset.
if ( ! this . port ) {
this . brokerurl = this . brokerurl + "1883" ;
} else {
this . brokerurl = this . brokerurl + this . port ;
}
} else {
this . brokerurl = this . brokerurl + "localhost:1883" ;
2018-01-11 22:22:02 +01:00
}
2015-06-03 08:54:43 +02:00
}
}
2015-09-01 23:30:15 +02:00
if ( ! this . cleansession && ! this . clientid ) {
this . cleansession = true ;
this . warn ( RED . _ ( "mqtt.errors.nonclean-missingclientid" ) ) ;
}
2015-06-03 08:54:43 +02:00
// Build options for passing to the MQTT.js API
this . options . clientId = this . clientid || 'mqtt_' + ( 1 + Math . random ( ) * 4294967295 ) . toString ( 16 ) ;
this . options . username = this . username ;
this . options . password = this . password ;
2015-09-01 23:30:15 +02:00
this . options . keepalive = this . keepalive ;
2015-09-01 23:58:26 +02:00
this . options . clean = this . cleansession ;
2015-09-01 23:30:15 +02:00
this . options . reconnectPeriod = RED . settings . mqttReconnectTime || 5000 ;
2020-10-08 21:24:35 +02:00
if ( this . compatmode == "true" || this . compatmode === true || this . protocolVersion == 3 ) {
2015-06-03 08:54:43 +02:00
this . options . protocolId = 'MQIsdp' ;
this . options . protocolVersion = 3 ;
2020-10-08 21:24:35 +02:00
} else if ( this . protocolVersion == 5 ) {
this . options . protocolVersion = 5 ;
this . options . properties = { } ;
this . options . properties . requestResponseInformation = true ;
this . options . properties . requestProblemInformation = true ;
2021-02-25 16:49:56 +01:00
if ( this . userProperties && /^ *{/ . test ( this . userProperties ) ) {
try {
setUserProperties ( JSON . parse ( this . userProperties ) , this . options . properties ) ;
} catch ( err ) { }
}
if ( this . sessionExpiryInterval && this . sessionExpiryInterval !== "0" ) {
setIntProp ( this , this . options . properties , "sessionExpiryInterval" ) ;
2020-10-15 00:30:03 +02:00
}
2015-06-03 08:54:43 +02:00
}
2016-04-27 13:31:54 +02:00
if ( this . usetls && n . tls ) {
var tlsNode = RED . nodes . getNode ( n . tls ) ;
if ( tlsNode ) {
tlsNode . addTLSOptions ( this . options ) ;
}
}
2017-09-20 11:30:07 +02:00
2016-04-27 13:31:54 +02:00
// If there's no rejectUnauthorized already, then this could be an
// old config where this option was provided on the broker node and
// not the tls node
if ( typeof this . options . rejectUnauthorized === 'undefined' ) {
this . options . rejectUnauthorized = ( this . verifyservercert == "true" || this . verifyservercert === true ) ;
}
2015-09-01 23:30:15 +02:00
if ( n . willTopic ) {
this . options . will = {
topic : n . willTopic ,
payload : n . willPayload || "" ,
qos : Number ( n . willQos || 0 ) ,
2020-10-15 00:30:03 +02:00
retain : n . willRetain == "true" || n . willRetain === true ,
//TODO: add willDelayInterval, payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
2015-09-01 23:30:15 +02:00
} ;
2021-02-25 16:49:56 +01:00
if ( n . willMsg ) {
this . options . will . properties = { } ;
setStrProp ( n . willMsg , this . options . will . properties , "contentType" ) ;
if ( n . willMsg . userProps && /^ *{/ . test ( n . willMsg . userProps ) ) {
try {
2021-02-25 20:58:59 +01:00
setUserProperties ( JSON . parse ( n . willMsg . userProps ) , this . options . will . properties ) ;
2021-02-25 16:49:56 +01:00
} catch ( err ) { }
}
n . willMsg . responseTopic = n . willMsg . respTopic ;
setStrProp ( n . willMsg , this . options . will . properties , "responseTopic" ) ;
n . willMsg . correlationData = n . willMsg . correl ;
setBufferProp ( n . willMsg , this . options . will . properties , "correlationData" ) ;
n . willMsg . willDelayInterval = n . willMsg . delay
setIntProp ( n . willMsg , this . options . will . properties , "willDelayInterval" )
n . willMsg . messageExpiryInterval = n . willMsg . expiry
setIntProp ( n . willMsg , this . options . will . properties , "messageExpiryInterval" )
this . options . will . payloadFormatIndicator = true ;
}
2015-06-03 08:54:43 +02:00
}
2021-02-25 16:49:56 +01:00
// console.log(this.brokerurl,this.options);
2015-06-03 08:54:43 +02:00
// Define functions called by MQTT in and out nodes
var node = this ;
2015-11-17 23:19:56 +01:00
this . users = { } ;
2017-03-10 21:12:52 +01:00
this . register = function ( mqttNode ) {
2015-11-17 23:19:56 +01:00
node . users [ mqttNode . id ] = mqttNode ;
if ( Object . keys ( node . users ) . length === 1 ) {
node . connect ( ) ;
}
2015-06-03 08:54:43 +02:00
} ;
2017-03-10 21:12:52 +01:00
this . deregister = function ( mqttNode , done ) {
2015-11-17 23:19:56 +01:00
delete node . users [ mqttNode . id ] ;
2015-12-23 00:25:29 +01:00
if ( node . closing ) {
return done ( ) ;
}
2015-11-17 23:19:56 +01:00
if ( Object . keys ( node . users ) . length === 0 ) {
2016-04-18 14:54:05 +02:00
if ( node . client && node . client . connected ) {
2015-12-23 00:25:29 +01:00
return node . client . end ( done ) ;
2016-04-18 14:54:05 +02:00
} else {
node . client . end ( ) ;
2016-07-26 22:33:00 +02:00
return done ( ) ;
2015-12-23 00:25:29 +01:00
}
2015-06-03 08:54:43 +02:00
}
2015-12-23 00:25:29 +01:00
done ( ) ;
2015-06-03 08:54:43 +02:00
} ;
this . connect = function ( ) {
if ( ! node . connected && ! node . connecting ) {
node . connecting = true ;
2018-01-25 17:58:42 +01:00
try {
2020-10-08 21:24:35 +02:00
node . serverProperties = { } ;
2018-01-25 17:58:42 +01:00
node . client = mqtt . connect ( node . brokerurl , node . options ) ;
node . client . setMaxListeners ( 0 ) ;
// Register successful connect or reconnect handler
2020-10-15 00:30:03 +02:00
node . client . on ( 'connect' , function ( connack ) {
2018-01-25 17:58:42 +01:00
node . connecting = false ;
node . connected = true ;
2021-02-25 16:49:56 +01:00
node . topicAliases = { } ;
2018-01-25 17:58:42 +01:00
node . log ( RED . _ ( "mqtt.state.connected" , { broker : ( node . clientid ? node . clientid + "@" : "" ) + node . brokerurl } ) ) ;
2020-10-15 00:30:03 +02:00
if ( node . options . protocolVersion == 5 && connack && connack . hasOwnProperty ( "properties" ) ) {
2020-12-07 13:25:51 +01:00
if ( typeof connack . properties == "object" ) {
//clean & assign all props sent from server.
2021-02-25 16:49:56 +01:00
setIntProp ( connack . properties , node . serverProperties , "topicAliasMaximum" , 0 ) ;
2020-12-07 13:25:51 +01:00
setIntProp ( connack . properties , node . serverProperties , "receiveMaximum" , 0 ) ;
setIntProp ( connack . properties , node . serverProperties , "sessionExpiryInterval" , 0 , 0xFFFFFFFF ) ;
setIntProp ( connack . properties , node . serverProperties , "maximumQoS" , 0 , 2 ) ;
2021-02-25 16:49:56 +01:00
setBoolProp ( connack . properties , node . serverProperties , "retainAvailable" , true ) ;
setBoolProp ( connack . properties , node . serverProperties , "wildcardSubscriptionAvailable" , true ) ;
setBoolProp ( connack . properties , node . serverProperties , "subscriptionIdentifiersAvailable" , true ) ;
2020-12-07 13:25:51 +01:00
setBoolProp ( connack . properties , node . serverProperties , "sharedSubscriptionAvailable" ) ;
2021-02-25 16:49:56 +01:00
setIntProp ( connack . properties , node . serverProperties , "maximumPacketSize" , 0 ) ;
setIntProp ( connack . properties , node . serverProperties , "serverKeepAlive" ) ;
setStrProp ( connack . properties , node . serverProperties , "responseInformation" ) ;
setStrProp ( connack . properties , node . serverProperties , "serverReference" ) ;
setStrProp ( connack . properties , node . serverProperties , "assignedClientIdentifier" ) ;
setStrProp ( connack . properties , node . serverProperties , "reasonString" ) ;
2020-12-07 13:25:51 +01:00
setUserProperties ( connack . properties , node . serverProperties ) ;
2021-02-25 16:49:56 +01:00
// node.debug("CONNECTED. node.serverProperties ==> "+JSON.stringify(node.serverProperties));//TODO: remove
2020-10-08 21:24:35 +02:00
}
}
2018-01-25 17:58:42 +01:00
for ( var id in node . users ) {
if ( node . users . hasOwnProperty ( id ) ) {
node . users [ id ] . status ( { fill : "green" , shape : "dot" , text : "node-red:common.status.connected" } ) ;
}
2015-11-17 23:19:56 +01:00
}
2018-01-25 17:58:42 +01:00
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node . client . removeAllListeners ( 'message' ) ;
2015-06-03 08:54:43 +02:00
2018-01-25 17:58:42 +01:00
// Re-subscribe to stored topics
for ( var s in node . subscriptions ) {
if ( node . subscriptions . hasOwnProperty ( s ) ) {
2020-10-08 21:24:35 +02:00
let topic = s ;
let qos = 0 ;
let _options = { } ;
2018-01-25 17:58:42 +01:00
for ( var r in node . subscriptions [ s ] ) {
if ( node . subscriptions [ s ] . hasOwnProperty ( r ) ) {
qos = Math . max ( qos , node . subscriptions [ s ] [ r ] . qos ) ;
2020-10-08 21:24:35 +02:00
_options = node . subscriptions [ s ] [ r ] . options ;
2018-01-25 17:58:42 +01:00
node . client . on ( 'message' , node . subscriptions [ s ] [ r ] . handler ) ;
}
2016-07-26 22:33:00 +02:00
}
2020-10-08 21:24:35 +02:00
_options . qos = _options . qos || qos ;
node . client . subscribe ( topic , _options ) ;
2016-07-26 22:33:00 +02:00
}
2015-06-03 08:54:43 +02:00
}
2018-01-25 17:58:42 +01:00
// Send any birth message
if ( node . birthMessage ) {
node . publish ( node . birthMessage ) ;
2016-02-19 23:52:43 +01:00
}
2018-01-25 17:58:42 +01:00
} ) ;
node . client . on ( "reconnect" , function ( ) {
2015-11-17 23:19:56 +01:00
for ( var id in node . users ) {
if ( node . users . hasOwnProperty ( id ) ) {
2018-01-25 17:58:42 +01:00
node . users [ id ] . status ( { fill : "yellow" , shape : "ring" , text : "node-red:common.status.connecting" } ) ;
2015-11-17 23:19:56 +01:00
}
}
2020-10-15 00:30:03 +02:00
} ) ;
2020-12-07 13:26:27 +01:00
//TODO: what to do with this event? Anything? Necessary?
2020-10-15 00:30:03 +02:00
node . client . on ( "disconnect" , function ( packet ) {
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
var rc = packet && packet . properties && packet . properties . reasonString ;
var rc = packet && packet . properties && packet . reasonCode ;
2020-12-07 13:48:33 +01:00
//TODO: If keeping this event, do we use these? log these?
2020-10-15 00:30:03 +02:00
} ) ;
2018-01-25 17:58:42 +01:00
// Register disconnect handlers
node . client . on ( 'close' , function ( ) {
if ( node . connected ) {
node . connected = false ;
node . log ( RED . _ ( "mqtt.state.disconnected" , { broker : ( node . clientid ? node . clientid + "@" : "" ) + node . brokerurl } ) ) ;
for ( var id in node . users ) {
if ( node . users . hasOwnProperty ( id ) ) {
node . users [ id ] . status ( { fill : "red" , shape : "ring" , text : "node-red:common.status.disconnected" } ) ;
}
}
} else if ( node . connecting ) {
node . log ( RED . _ ( "mqtt.state.connect-failed" , { broker : ( node . clientid ? node . clientid + "@" : "" ) + node . brokerurl } ) ) ;
}
} ) ;
2015-06-03 08:54:43 +02:00
2018-01-25 17:58:42 +01:00
// Register connect error handler
// The client's own reconnect logic will take care of errors
2020-10-15 00:30:03 +02:00
node . client . on ( 'error' , function ( error ) {
} ) ;
2018-01-25 17:58:42 +01:00
} catch ( err ) {
console . log ( err ) ;
}
2015-06-03 08:54:43 +02:00
}
} ;
2021-02-25 16:49:56 +01:00
this . subscriptionIds = { } ;
this . subid = 1 ;
2020-10-08 21:24:35 +02:00
this . subscribe = function ( topic , options , callback , ref ) {
2015-06-03 08:54:43 +02:00
ref = ref || 0 ;
2020-10-08 21:24:35 +02:00
var qos ;
if ( typeof options == "object" ) {
qos = options . qos ;
} else {
qos = options ;
options = { } ;
}
options . qos = qos ;
2021-02-25 16:49:56 +01:00
if ( ! node . subscriptionIds [ topic ] ) {
node . subscriptionIds [ topic ] = node . subid ++ ;
}
options . properties = options . properties || { } ;
options . properties . subscriptionIdentifier = node . subscriptionIds [ topic ] ;
2015-06-03 08:54:43 +02:00
node . subscriptions [ topic ] = node . subscriptions [ topic ] || { } ;
var sub = {
topic : topic ,
qos : qos ,
2020-10-08 21:24:35 +02:00
options : options ,
2015-06-03 08:54:43 +02:00
handler : function ( mtopic , mpayload , mpacket ) {
2020-10-15 00:30:03 +02:00
if ( mpacket . properties && options . properties && mpacket . properties . subscriptionIdentifier && options . properties . subscriptionIdentifier && ( mpacket . properties . subscriptionIdentifier !== options . properties . subscriptionIdentifier ) ) {
//do nothing as subscriptionIdentifier does not match
2021-02-25 16:49:56 +01:00
// node.debug(`> no match - this nodes subID (${options.properties.subscriptionIdentifier}) !== packet subID (${mpacket.properties.subscriptionIdentifier})`); //TODO: remove
2020-10-15 00:30:03 +02:00
} else if ( matchTopic ( topic , mtopic ) ) {
2021-02-25 16:49:56 +01:00
// node.debug(`> MATCHED '${topic}' to '${mtopic}' - performing callback`); //TODO: remove
2015-06-03 08:54:43 +02:00
callback ( mtopic , mpayload , mpacket ) ;
2021-02-25 16:49:56 +01:00
} else {
// node.debug(`> no match / no callback`); //TODO: remove
}
2015-06-03 08:54:43 +02:00
} ,
ref : ref
} ;
node . subscriptions [ topic ] [ ref ] = sub ;
if ( node . connected ) {
2021-02-25 16:49:56 +01:00
// node.debug(`this.subscribe - registering handler ref ${ref} for ${topic} and subscribing `+JSON.stringify(options)); //TODO: remove
2015-06-03 08:54:43 +02:00
node . client . on ( 'message' , sub . handler ) ;
node . client . subscribe ( topic , options ) ;
}
} ;
2019-03-29 11:30:11 +01:00
this . unsubscribe = function ( topic , ref , removed ) {
2015-06-03 08:54:43 +02:00
ref = ref || 0 ;
var sub = node . subscriptions [ topic ] ;
2021-02-25 16:49:56 +01:00
// var _debug = `unsubscribe for topic ${topic} called... ` ; //TODO: remove
2015-06-03 08:54:43 +02:00
if ( sub ) {
2021-02-25 16:49:56 +01:00
// _debug += "sub found. " //TODO: remove
2015-06-03 08:54:43 +02:00
if ( sub [ ref ] ) {
2021-02-25 16:49:56 +01:00
// debug(`this.unsubscribe - removing handler ref ${ref} for ${topic} `); //TODO: remove
// _debug += `removing handler ref ${ref} for ${topic}. `
2015-06-03 08:54:43 +02:00
node . client . removeListener ( 'message' , sub [ ref ] . handler ) ;
delete sub [ ref ] ;
}
2021-02-25 16:49:56 +01:00
//TODO: Review. The `if(removed)` was commented out to always delete and remove subscriptions.
2020-12-07 13:48:33 +01:00
// if we dont then property changes dont get applied and old subs still trigger
2020-10-15 00:30:03 +02:00
//if (removed) {
2021-02-25 16:49:56 +01:00
2019-03-29 11:30:11 +01:00
if ( Object . keys ( sub ) . length === 0 ) {
delete node . subscriptions [ topic ] ;
2021-02-25 16:49:56 +01:00
delete node . subscriptionIds [ topic ] ;
2019-03-29 11:30:11 +01:00
if ( node . connected ) {
2021-02-25 16:49:56 +01:00
// _debug += `calling client.unsubscribe to remove topic ${topic}` //TODO: remove
2019-03-29 11:30:11 +01:00
node . client . unsubscribe ( topic ) ;
}
2015-06-03 08:54:43 +02:00
}
2020-10-15 00:30:03 +02:00
//}
} else {
2021-02-25 16:49:56 +01:00
// _debug += "sub not found! "; //TODO: remove
2015-06-03 08:54:43 +02:00
}
2021-02-25 16:49:56 +01:00
// node.debug(_debug); //TODO: remove
2020-10-15 00:30:03 +02:00
2015-06-03 08:54:43 +02:00
} ;
2021-02-25 16:49:56 +01:00
this . topicAliases = { } ;
2015-06-03 08:54:43 +02:00
2019-08-14 16:54:06 +02:00
this . publish = function ( msg , done ) {
2015-06-03 08:54:43 +02:00
if ( node . connected ) {
2018-01-13 23:53:58 +01:00
if ( msg . payload === null || msg . payload === undefined ) {
msg . payload = "" ;
} else if ( ! Buffer . isBuffer ( msg . payload ) ) {
2015-06-03 08:54:43 +02:00
if ( typeof msg . payload === "object" ) {
msg . payload = JSON . stringify ( msg . payload ) ;
} else if ( typeof msg . payload !== "string" ) {
msg . payload = "" + msg . payload ;
}
}
var options = {
qos : msg . qos || 0 ,
retain : msg . retain || false
} ;
2020-10-08 21:24:35 +02:00
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
if ( node . options . protocolVersion == 5 ) {
options . properties = options . properties || { } ;
setStrProp ( msg , options . properties , "responseTopic" ) ;
2021-02-20 20:58:13 +01:00
setBufferProp ( msg , options . properties , "correlationData" ) ;
setStrProp ( msg , options . properties , "contentType" ) ;
2021-02-25 16:49:56 +01:00
setIntProp ( msg , options . properties , "messageExpiryInterval" , 0 ) ;
2020-10-08 21:24:35 +02:00
setUserProperties ( msg . userProperties , options . properties ) ;
2021-02-20 20:58:13 +01:00
setIntProp ( msg , options . properties , "topicAlias" , 1 , node . serverProperties . topicAliasMaximum || 0 ) ;
2021-02-25 16:49:56 +01:00
setBoolProp ( msg , options . properties , "payloadFormatIndicator" ) ;
2021-02-20 20:58:13 +01:00
//FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
2021-02-25 16:49:56 +01:00
if ( options . properties . topicAlias ) {
if ( ! node . topicAliases . hasOwnProperty ( options . properties . topicAlias ) && msg . topic == "" ) {
done ( "Invalid topicAlias" ) ;
return
}
if ( node . topicAliases [ options . properties . topicAlias ] === msg . topic ) {
msg . topic = ""
} else {
node . topicAliases [ options . properties . topicAlias ] = msg . topic
}
}
2020-10-08 21:24:35 +02:00
}
2021-02-25 16:49:56 +01:00
2019-08-14 16:54:06 +02:00
node . client . publish ( msg . topic , msg . payload , options , function ( err ) {
2021-02-25 16:49:56 +01:00
done && done ( err ) ;
2019-08-14 16:54:06 +02:00
return
} ) ;
2015-06-03 08:54:43 +02:00
}
} ;
2015-11-17 23:19:56 +01:00
this . on ( 'close' , function ( done ) {
2015-12-23 00:25:29 +01:00
this . closing = true ;
2015-06-03 08:54:43 +02:00
if ( this . connected ) {
2018-05-03 21:42:39 +02:00
// Send close message
if ( node . closeMessage ) {
node . publish ( node . closeMessage ) ;
}
2015-12-23 00:25:29 +01:00
this . client . once ( 'close' , function ( ) {
2015-11-17 23:19:56 +01:00
done ( ) ;
2015-06-03 08:54:43 +02:00
} ) ;
this . client . end ( ) ;
2017-03-10 21:12:52 +01:00
} else if ( this . connecting || node . client . reconnecting ) {
2016-04-27 13:31:54 +02:00
node . client . end ( ) ;
done ( ) ;
2015-06-03 08:54:43 +02:00
} else {
2015-11-17 23:19:56 +01:00
done ( ) ;
2015-06-03 08:54:43 +02:00
}
} ) ;
2013-11-25 23:50:08 +01:00
}
2015-06-03 08:54:43 +02:00
2014-07-18 15:47:58 +02:00
RED . nodes . registerType ( "mqtt-broker" , MQTTBrokerNode , {
credentials : {
user : { type : "text" } ,
password : { type : "password" }
2013-11-25 23:50:08 +01:00
}
2014-05-04 00:32:04 +02:00
} ) ;
2014-05-29 10:00:28 +02:00
2014-05-04 00:32:04 +02:00
function MQTTInNode ( n ) {
RED . nodes . createNode ( this , n ) ;
this . topic = n . topic ;
2016-06-20 22:35:43 +02:00
this . qos = parseInt ( n . qos ) ;
2020-10-08 21:24:35 +02:00
this . subscriptionIdentifier = n . subscriptionIdentifier ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
this . nl = n . nl ;
this . rap = n . rap ;
this . rh = n . rh ;
2016-06-20 22:35:43 +02:00
if ( isNaN ( this . qos ) || this . qos < 0 || this . qos > 2 ) {
this . qos = 2 ;
}
2014-05-04 00:32:04 +02:00
this . broker = n . broker ;
2015-06-03 08:54:43 +02:00
this . brokerConn = RED . nodes . getNode ( this . broker ) ;
2016-02-10 23:38:59 +01:00
if ( ! /^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/ . test ( this . topic ) ) {
return this . warn ( RED . _ ( "mqtt.errors.invalid-topic" ) ) ;
}
2018-10-11 21:22:44 +02:00
this . datatype = n . datatype || "utf8" ;
2015-11-17 23:19:56 +01:00
var node = this ;
2015-06-03 08:54:43 +02:00
if ( this . brokerConn ) {
2020-10-08 21:24:35 +02:00
let v5 = this . brokerConn . options && this . brokerConn . options . protocolVersion == 5 ;
2016-09-19 10:36:38 +02:00
this . status ( { fill : "red" , shape : "ring" , text : "node-red:common.status.disconnected" } ) ;
2015-03-16 18:07:46 +01:00
if ( this . topic ) {
2015-12-23 00:25:29 +01:00
node . brokerConn . register ( this ) ;
2020-10-15 00:30:03 +02:00
let options = { qos : this . qos } ;
2020-10-08 21:24:35 +02:00
if ( v5 ) {
2021-02-25 16:49:56 +01:00
// options.properties = {};
// if(node.userProperties) {
// let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, {});
// setUserProperties(userProperties, options.properties);
// }
// setIntProp(node,options.properties,"subscriptionIdentifier", 1);
2020-10-15 00:30:03 +02:00
setIntProp ( node , options , "rh" ) ;
2020-10-08 21:24:35 +02:00
if ( node . nl === "true" || node . nl === true ) options . nl = true ;
2020-10-15 00:30:03 +02:00
else if ( node . nl === "false" || node . nl === false ) options . nl = false ;
2020-10-08 21:24:35 +02:00
if ( node . rap === "true" || node . rap === true ) options . rap = true ;
2020-10-15 00:30:03 +02:00
else if ( node . rap === "false" || node . rap === false ) options . rap = false ;
2020-10-08 21:24:35 +02:00
}
2021-02-25 16:49:56 +01:00
2020-10-08 21:24:35 +02:00
this . brokerConn . subscribe ( this . topic , options , function ( topic , payload , packet ) {
2021-02-25 16:49:56 +01:00
// node.debug(`Sent ${topic}, datatype ${node.datatype} `+JSON.stringify(packet));//TODO: remove
2019-02-04 17:35:42 +01:00
if ( node . datatype === "buffer" ) {
2018-10-29 15:10:04 +01:00
// payload = payload;
2019-02-04 17:35:42 +01:00
} else if ( node . datatype === "base64" ) {
2018-10-29 15:10:04 +01:00
payload = payload . toString ( 'base64' ) ;
2019-02-04 17:35:42 +01:00
} else if ( node . datatype === "utf8" ) {
2018-10-29 15:10:04 +01:00
payload = payload . toString ( 'utf8' ) ;
2019-02-04 17:35:42 +01:00
} else if ( node . datatype === "json" ) {
if ( isUtf8 ( payload ) ) {
payload = payload . toString ( ) ;
try { payload = JSON . parse ( payload ) ; }
catch ( e ) { node . error ( RED . _ ( "mqtt.errors.invalid-json-parse" ) , { payload : payload , topic : topic , qos : packet . qos , retain : packet . retain } ) ; return ; }
}
else { node . error ( ( RED . _ ( "mqtt.errors.invalid-json-string" ) ) , { payload : payload , topic : topic , qos : packet . qos , retain : packet . retain } ) ; return ; }
2018-10-29 15:10:04 +01:00
} else {
if ( isUtf8 ( payload ) ) { payload = payload . toString ( ) ; }
}
2019-02-04 17:35:42 +01:00
var msg = { topic : topic , payload : payload , qos : packet . qos , retain : packet . retain } ;
2020-10-08 21:24:35 +02:00
if ( v5 && packet . properties ) {
//msg.properties = packet.properties;
setStrProp ( packet . properties , msg , "responseTopic" ) ;
2021-02-20 20:58:13 +01:00
setBufferProp ( packet . properties , msg , "correlationData" ) ;
2021-02-25 16:49:56 +01:00
setStrProp ( packet . properties , msg , "contentType" ) ;
2021-02-20 20:58:13 +01:00
// setIntProp(packet.properties, msg, "topicAlias", 1, node.brokerConn.serverProperties.topicAliasMaximum || 0);
// setIntProp(packet.properties, msg, "subscriptionIdentifier", 1, 268435455);
2021-02-25 16:49:56 +01:00
setIntProp ( packet . properties , msg , "messageExpiryInterval" , 0 ) ;
setBoolProp ( packet . properties , msg , "payloadFormatIndicator" ) ;
setStrProp ( packet . properties , msg , "reasonString" ) ;
2020-10-15 00:30:03 +02:00
setUserProperties ( packet . properties . userProperties , msg ) ;
2020-10-08 21:24:35 +02:00
}
2015-06-03 08:54:43 +02:00
if ( ( node . brokerConn . broker === "localhost" ) || ( node . brokerConn . broker === "127.0.0.1" ) ) {
2015-03-16 18:07:46 +01:00
msg . _topic = topic ;
}
node . send ( msg ) ;
2015-04-09 21:10:34 +02:00
} , this . id ) ;
2015-06-03 08:54:43 +02:00
if ( this . brokerConn . connected ) {
2016-09-19 10:36:38 +02:00
node . status ( { fill : "green" , shape : "dot" , text : "node-red:common.status.connected" } ) ;
2015-04-09 21:10:34 +02:00
}
2015-03-16 18:07:46 +01:00
}
else {
2015-05-10 22:47:22 +02:00
this . error ( RED . _ ( "mqtt.errors.not-defined" ) ) ;
2015-03-16 18:07:46 +01:00
}
2018-11-08 17:03:41 +01:00
this . on ( 'close' , function ( removed , done ) {
2015-11-17 23:19:56 +01:00
if ( node . brokerConn ) {
2019-03-29 11:30:11 +01:00
node . brokerConn . unsubscribe ( node . topic , node . id , removed ) ;
2015-12-23 00:25:29 +01:00
node . brokerConn . deregister ( node , done ) ;
2015-11-17 23:19:56 +01:00
}
} ) ;
2014-05-04 00:32:04 +02:00
} else {
2015-05-10 22:47:22 +02:00
this . error ( RED . _ ( "mqtt.errors.missing-config" ) ) ;
2014-05-04 00:32:04 +02:00
}
2013-09-05 16:02:48 +02:00
}
2014-05-04 00:32:04 +02:00
RED . nodes . registerType ( "mqtt in" , MQTTInNode ) ;
2014-05-29 10:00:28 +02:00
2014-05-04 00:32:04 +02:00
function MQTTOutNode ( n ) {
RED . nodes . createNode ( this , n ) ;
this . topic = n . topic ;
2014-08-22 13:42:07 +02:00
this . qos = n . qos || null ;
this . retain = n . retain ;
2014-05-04 00:32:04 +02:00
this . broker = n . broker ;
2021-02-25 16:49:56 +01:00
this . responseTopic = n . respTopic ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901114
this . correlationData = n . correl ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901115
2020-10-08 21:24:35 +02:00
this . contentType = n . contentType ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901118
2021-02-25 16:49:56 +01:00
this . messageExpiryInterval = n . expiry ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112
try {
if ( /^ *{/ . test ( n . userProps ) ) {
2021-02-25 20:58:59 +01:00
//setup this.userProperties
setUserProperties ( JSON . parse ( n . userProps ) , this ) ; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
2021-02-25 16:49:56 +01:00
}
} catch ( err ) { }
2021-02-20 20:58:13 +01:00
// this.topicAlias = n.topicAlias; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
// this.payloadFormatIndicator = n.payloadFormatIndicator; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111
// this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
2021-02-25 16:49:56 +01:00
2015-06-03 08:54:43 +02:00
this . brokerConn = RED . nodes . getNode ( this . broker ) ;
2015-11-17 23:19:56 +01:00
var node = this ;
2020-03-18 16:47:03 +01:00
var chk = /[\+#]/ ;
2014-05-29 10:00:28 +02:00
2015-06-03 08:54:43 +02:00
if ( this . brokerConn ) {
2020-10-08 21:24:35 +02:00
let v5 = this . brokerConn . options && this . brokerConn . options . protocolVersion == 5 ;
2016-09-19 10:36:38 +02:00
this . status ( { fill : "red" , shape : "ring" , text : "node-red:common.status.disconnected" } ) ;
2019-08-14 16:54:06 +02:00
this . on ( "input" , function ( msg , send , done ) {
2014-09-08 22:10:06 +02:00
if ( msg . qos ) {
msg . qos = parseInt ( msg . qos ) ;
if ( ( msg . qos !== 0 ) && ( msg . qos !== 1 ) && ( msg . qos !== 2 ) ) {
msg . qos = null ;
2014-08-22 13:42:07 +02:00
}
2013-09-05 16:02:48 +02:00
}
2014-09-08 22:10:06 +02:00
msg . qos = Number ( node . qos || msg . qos || 0 ) ;
msg . retain = node . retain || msg . retain || false ;
msg . retain = ( ( msg . retain === true ) || ( msg . retain === "true" ) ) || false ;
2020-12-07 13:26:27 +01:00
/** If node property exists, override/set that to property in msg */
2020-10-15 00:30:03 +02:00
let msgPropOverride = function ( propName ) { if ( node [ propName ] ) { msg [ propName ] = node [ propName ] ; } }
msgPropOverride ( "topic" ) ;
2020-10-08 21:24:35 +02:00
if ( v5 ) {
2021-02-25 16:49:56 +01:00
if ( node . userProperties ) {
msg . userProperties = node . userProperties ;
2020-10-08 21:24:35 +02:00
}
2021-02-25 16:49:56 +01:00
if ( node . responseTopic ) {
msg . responseTopic = node . responseTopic ;
2021-02-20 20:58:13 +01:00
}
2021-02-25 16:49:56 +01:00
if ( node . correlationData ) {
msg . correlationData = node . correlationData ;
2020-10-08 21:24:35 +02:00
}
2021-02-25 16:49:56 +01:00
if ( node . contentType ) {
msg . contentType = node . contentType ;
2021-02-20 20:58:13 +01:00
}
2021-02-25 16:49:56 +01:00
if ( node . messageExpiryInterval ) {
msg . messageExpiryInterval = node . messageExpiryInterval ;
2021-02-20 20:58:13 +01:00
}
//Next, update/override the msg.xxxx properties from node config
2020-12-07 13:26:27 +01:00
//TODO: Should we be expecting msg.properties.xxxx instead of msg.xxxx?
2021-02-20 20:58:13 +01:00
// setStrProp(node,msg,"responseTopic");
// setBufferProp(node,msg,"correlationData");
// setStrProp(node,msg,"contentType");
// setIntProp(node,msg,"messageExpiryInterval");
//FUTURE setStrProp(node,msg,"topicAlias");
//FUTURE setBoolProp(node,msg,"payloadFormatIndicator");
//FUTURE setIntProp(node,msg,"subscriptionIdentifier");
2014-09-08 22:10:06 +02:00
}
2021-02-25 16:49:56 +01:00
if ( msg . userProperties && typeof msg . userProperties !== "object" ) {
delete msg . userProperties ;
}
if ( msg . hasOwnProperty ( "topicAlias" ) && ! isNaN ( msg . topicAlias ) && ( msg . topicAlias === 0 || node . brokerConn . serverProperties . topicAliasMaximum === 0 || msg . topicAlias > node . brokerConn . serverProperties . topicAliasMaximum ) ) {
delete msg . topicAlias ;
}
2015-04-14 19:39:42 +02:00
if ( msg . hasOwnProperty ( "payload" ) ) {
2020-10-08 21:24:35 +02:00
let topicOK = msg . hasOwnProperty ( "topic" ) && ( typeof msg . topic === "string" ) && ( msg . topic !== "" ) ;
2020-12-07 10:43:52 +01:00
if ( ! topicOK && v5 ) {
2020-12-07 13:26:27 +01:00
//NOTE: A value of 0 (in server props topicAliasMaximum) indicates that the Server does not accept any Topic Aliases on this connection
2021-02-25 16:49:56 +01:00
if ( msg . hasOwnProperty ( "topicAlias" ) && ! isNaN ( msg . topicAlias ) && msg . topicAlias >= 0 && node . brokerConn . serverProperties . topicAliasMaximum && node . brokerConn . serverProperties . topicAliasMaximum >= msg . topicAlias ) {
2020-12-07 10:43:52 +01:00
topicOK = true ;
msg . topic = "" ; //must be empty string
} else if ( msg . hasOwnProperty ( "responseTopic" ) && ( typeof msg . responseTopic === "string" ) && ( msg . responseTopic !== "" ) ) {
2020-12-07 13:26:27 +01:00
//TODO: if topic is empty but responseTopic has a string value, use that instead. Is this desirable?
2020-12-07 10:43:52 +01:00
topicOK = true ;
msg . topic = msg . responseTopic ;
2020-12-07 13:26:27 +01:00
//TODO: delete msg.responseTopic - to prevent it being resent?
2020-12-07 10:43:52 +01:00
}
2020-10-08 21:24:35 +02:00
}
if ( topicOK ) { // topic must exist
2021-02-25 16:49:56 +01:00
// node.debug(`sending msg to ${msg.topic} `+JSON.stringify(msg));//TODO: remove
this . brokerConn . publish ( msg , function ( err ) {
2020-10-15 00:30:03 +02:00
let args = arguments ;
let l = args . length ;
2021-02-25 16:49:56 +01:00
done ( err ) ;
2020-10-15 00:30:03 +02:00
} ) ; // send the message
2019-08-14 16:54:06 +02:00
} else {
node . warn ( RED . _ ( "mqtt.errors.invalid-topic" ) ) ;
done ( ) ;
2015-04-14 19:39:42 +02:00
}
2019-08-14 16:54:06 +02:00
} else {
done ( ) ;
2014-09-08 22:10:06 +02:00
}
2014-05-04 00:32:04 +02:00
} ) ;
2015-06-03 08:54:43 +02:00
if ( this . brokerConn . connected ) {
2016-09-19 10:36:38 +02:00
node . status ( { fill : "green" , shape : "dot" , text : "node-red:common.status.connected" } ) ;
2015-04-27 21:08:00 +02:00
}
2015-11-17 23:19:56 +01:00
node . brokerConn . register ( node ) ;
2015-12-23 00:25:29 +01:00
this . on ( 'close' , function ( done ) {
node . brokerConn . deregister ( node , done ) ;
2015-11-17 23:19:56 +01:00
} ) ;
2014-05-04 00:32:04 +02:00
} else {
2015-05-10 22:47:22 +02:00
this . error ( RED . _ ( "mqtt.errors.missing-config" ) ) ;
2014-05-04 00:32:04 +02:00
}
2013-09-05 16:02:48 +02:00
}
2014-05-04 00:32:04 +02:00
RED . nodes . registerType ( "mqtt out" , MQTTOutNode ) ;
2020-10-08 21:24:35 +02:00
} ;