2022-03-19 18:29:31 +01:00
/* These tests are only supposed to be executed at development time (for now)*/
2022-03-21 14:50:24 +01:00
"use strict" ;
2022-03-19 18:29:31 +01:00
const should = require ( "should" ) ;
const helper = require ( "node-red-node-test-helper" ) ;
2022-04-15 19:21:36 +02:00
const { doesNotThrow } = require ( "should" ) ;
2022-03-19 18:29:31 +01:00
const mqttNodes = require ( "nr-test-utils" ) . require ( "@node-red/nodes/core/network/10-mqtt.js" ) ;
2022-03-21 15:59:10 +01:00
const BROKER _HOST = process . env . MQTT _BROKER _SERVER || "localhost" ;
2022-03-21 14:50:24 +01:00
const BROKER _PORT = process . env . MQTT _BROKER _PORT || 1883 ;
2022-03-23 11:00:36 +01:00
//By default, MQTT tests are disabled. Set ENV VAR NR_MQTT_TESTS to "1" or "true" to enable
const skipTests = process . env . NR _MQTT _TESTS != "true" && process . env . NR _MQTT _TESTS != "1" ;
2022-03-19 18:29:31 +01:00
describe ( 'MQTT Nodes' , function ( ) {
before ( function ( done ) {
helper . startServer ( done ) ;
} ) ;
after ( function ( done ) {
helper . stopServer ( done ) ;
} ) ;
afterEach ( function ( ) {
try {
helper . unload ( ) ;
2022-03-21 14:50:24 +01:00
} catch ( error ) { }
2022-03-19 18:29:31 +01:00
} ) ;
it ( 'should be loaded and have default values' , function ( done ) {
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const { flow , nodes } = buildBasicMQTTSendRecvFlow ( { id : "mqtt.broker" , name : "mqtt_broker" , autoConnect : false } , { id : "mqtt.in" , topic : "in_topic" } , { id : "mqtt.out" , topic : "out_topic" } ) ;
2022-03-19 18:29:31 +01:00
helper . load ( mqttNodes , flow , function ( ) {
try {
2022-03-21 14:50:24 +01:00
const mqttIn = helper . getNode ( "mqtt.in" ) ;
const mqttOut = helper . getNode ( "mqtt.out" ) ;
const mqttBroker = helper . getNode ( "mqtt.broker" ) ;
2022-03-19 18:29:31 +01:00
should ( mqttIn ) . be . type ( "object" , "mqtt in node should be an object" )
2022-03-21 14:50:24 +01:00
mqttIn . should . have . property ( 'broker' , nodes . mqtt _broker . id ) ; //should be the id of the broker node
2022-03-19 18:29:31 +01:00
mqttIn . should . have . property ( 'datatype' , 'utf8' ) ; //default: 'utf8'
mqttIn . should . have . property ( 'isDynamic' , false ) ; //default: false
mqttIn . should . have . property ( 'inputs' , 0 ) ; //default: 0
mqttIn . should . have . property ( 'qos' , 2 ) ; //default: 2
2022-03-21 14:50:24 +01:00
mqttIn . should . have . property ( 'topic' , "in_topic" ) ;
mqttIn . should . have . property ( 'wires' , [ [ "helper.node" ] ] ) ;
2022-03-19 18:29:31 +01:00
should ( mqttOut ) . be . type ( "object" , "mqtt out node should be an object" )
2022-03-21 14:50:24 +01:00
mqttOut . should . have . property ( 'broker' , nodes . mqtt _broker . id ) ; //should be the id of the broker node
mqttOut . should . have . property ( 'topic' , "out_topic" ) ;
2022-03-19 18:29:31 +01:00
should ( mqttBroker ) . be . type ( "object" , "mqtt broker node should be an object" )
2022-03-21 14:50:24 +01:00
mqttBroker . should . have . property ( 'broker' , BROKER _HOST ) ;
mqttBroker . should . have . property ( 'port' , BROKER _PORT ) ;
2022-03-19 18:29:31 +01:00
mqttBroker . should . have . property ( 'brokerurl' ) ;
// mqttBroker.should.have.property('autoUnsubscribe', true);//default: true
mqttBroker . should . have . property ( 'autoConnect' , false ) ; //Set "autoConnect:false" in brokerOptions
mqttBroker . should . have . property ( 'options' ) ;
mqttBroker . options . should . have . property ( 'clean' , true ) ;
mqttBroker . options . should . have . property ( 'clientId' ) ;
mqttBroker . options . clientId . should . containEql ( 'nodered_' ) ;
mqttBroker . options . should . have . property ( 'keepalive' ) . type ( "number" ) ;
mqttBroker . options . should . have . property ( 'reconnectPeriod' ) . type ( "number" ) ;
done ( ) ;
} catch ( error ) {
done ( error )
}
} ) ;
} ) ;
if ( skipTests ) {
2022-03-23 11:00:36 +01:00
it ( 'skipping MQTT tests. Set env var "NR_MQTT_TESTS=true" to enable. Requires a v5 capable broker running on localhost:1883.' , function ( done ) {
2022-03-19 18:29:31 +01:00
done ( ) ;
} ) ;
}
2022-04-29 20:56:37 +02:00
// Conditional test runner (only run if skipTests=false)
2022-03-19 18:29:31 +01:00
function itConditional ( title , test ) {
return ! skipTests ? it ( title , test ) : it . skip ( title , test ) ;
}
//#region ################### BASIC TESTS ################### #//
2022-03-21 14:50:24 +01:00
itConditional ( 'basic send and receive tests' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : "hello" ,
qos : 0
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { } , { datatype : "auto" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
//Prior to V3, "auto" mode would only parse to string or buffer.
2022-04-21 22:32:24 +02:00
itConditional ( 'should send JSON and receive string (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
topic : nextTopic ( ) ,
payload : '{"prop":"value1", "num":1}' ,
qos : 1
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { } , { datatype : "auto" , topicType : "static" } , { } , options , { done : done } ) ;
} )
2022-04-15 19:21:36 +02:00
//In V3, "auto" mode should try to parse JSON, then string and fall back to buffer
2022-04-21 22:32:24 +02:00
itConditional ( 'should send JSON and receive object (auto-detect mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : '{"prop":"value1", "num":1}' ,
qos : 1
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
2022-04-15 19:21:36 +02:00
options . expectMsg . payload = JSON . parse ( options . sendMsg . payload ) ;
2022-04-21 22:32:24 +02:00
testSendRecv ( { } , { datatype : "auto-detect" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-21 14:50:24 +01:00
} )
2022-04-15 19:21:36 +02:00
itConditional ( 'should send invalid JSON and receive string (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
topic : nextTopic ( ) ,
payload : '{prop:"value3", "num":3}' // send invalid JSON ...
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ; //expect same payload
testSendRecv ( { } , { datatype : "auto" , topicType : "static" } , { } , options , { done : done } ) ;
} ) ;
2022-04-21 22:32:24 +02:00
itConditional ( 'should send invalid JSON and receive string (auto-detect mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
topic : nextTopic ( ) ,
payload : '{prop:"value3", "num":3}' // send invalid JSON ...
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ; //expect same payload
testSendRecv ( { } , { datatype : "auto-detect" , topicType : "static" } , { } , options , { done : done } ) ;
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send JSON and receive string (utf8 mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : '{"prop":"value2", "num":2}' ,
qos : 2
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { } , { datatype : "utf8" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send JSON and receive Object (json mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : '{"prop":"value3", "num":3}' // send a string ...
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : { "prop" : "value3" , "num" : 3 } } ) ; //expect an object
testSendRecv ( { } , { datatype : "json" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send invalid JSON and raise error (json mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
topic : nextTopic ( ) ,
payload : '{prop:"value3", "num":3}' , // send invalid JSON ...
}
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
hooks . afterLoad = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
helperNode . on ( "input" , function ( msg ) {
try {
msg . should . have . a . property ( "error" ) . type ( "object" ) ;
msg . error . should . have . a . property ( "source" ) . type ( "object" ) ;
msg . error . source . should . have . a . property ( "id" , mqttIn . id ) ;
done ( ) ;
} catch ( err ) {
done ( err )
}
} ) ;
return true ; //handled
}
testSendRecv ( { } , { datatype : "json" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
itConditional ( 'should send String and receive Buffer (buffer mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : "a b c" //send string ...
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : Buffer . from ( options . sendMsg . payload ) } ) ; //expect Buffer.from(msg.payload)
testSendRecv ( { } , { datatype : "buffer" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send utf8 Buffer and receive String (auto mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : Buffer . from ( [ 0x78 , 0x20 , 0x79 , 0x20 , 0x7a ] ) // "x y z"
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : "x y z" } ) ; //set expected payload to "x y z"
testSendRecv ( { } , { datatype : "auto" , topicType : "static" } , { } , options , { done : done } ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send non utf8 Buffer and receive Buffer (auto mode)' , function ( done ) {
2022-03-19 18:29:31 +01:00
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
2022-03-19 18:29:31 +01:00
topic : nextTopic ( ) ,
payload : Buffer . from ( [ 0xC0 , 0xC1 , 0xF5 , 0xF6 , 0xF7 , 0xF8 , 0xF9 , 0xFA , 0xFB , 0xFC , 0xFD , 0xFE , 0xFF ] ) //non valid UTF8
}
2022-04-15 19:21:36 +02:00
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : Buffer . from ( [ 0xC0 , 0xC1 , 0xF5 , 0xF6 , 0xF7 , 0xF8 , 0xF9 , 0xFA , 0xFB , 0xFC , 0xFD , 0xFE , 0xFF ] ) } ) ;
2022-03-21 14:50:24 +01:00
testSendRecv ( { } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
itConditional ( 'should send/receive all v5 flags and settings' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
2022-03-19 18:29:31 +01:00
const t = nextTopic ( ) ;
2022-03-21 14:50:24 +01:00
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
2022-04-15 19:21:36 +02:00
topic : t + "/command" , payload : Buffer . from ( '{"version":"v5"}' ) , qos : 1 , retain : true ,
2022-03-19 18:29:31 +01:00
responseTopic : t + "/response" ,
userProperties : { prop1 : "val1" } ,
2022-04-15 19:21:36 +02:00
contentType : "text/plain" ,
2022-03-19 18:29:31 +01:00
correlationData : Buffer . from ( [ 1 , 2 , 3 ] ) ,
payloadFormatIndicator : true ,
messageExpiryInterval : 2000 ,
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
2022-04-15 19:21:36 +02:00
options . expectMsg . payload = options . expectMsg . payload . toString ( ) ; //auto mode + payloadFormatIndicator + contentType: "text/plain" should make a string
2022-03-21 14:50:24 +01:00
delete options . expectMsg . payloadFormatIndicator ; //Seems mqtt.js only publishes payloadFormatIndicator the will msg
2022-03-19 18:29:31 +01:00
const inOptions = {
datatype : "auto" , topicType : "static" ,
2022-03-21 14:50:24 +01:00
qos : 1 , nl : false , rap : true , rh : 1
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
testSendRecv ( { protocolVersion : 5 } , inOptions , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
2022-04-15 19:21:36 +02:00
itConditional ( 'should send regular string with v5 media type "text/plain" and receive a string (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : "abc" , contentType : "text/plain"
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
itConditional ( 'should send JSON with v5 media type "text/plain" and receive a string (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : '{"prop":"val"}' , contentType : "text/plain"
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
2022-04-21 22:32:24 +02:00
itConditional ( 'should send JSON with v5 media type "text/plain" and receive a string (auto-detect mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : '{"prop":"val"}' , contentType : "text/plain"
}
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto-detect" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
itConditional ( 'should send JSON with v5 media type "application/json" and receive an object (auto-detect mode)' , function ( done ) {
2022-04-15 19:21:36 +02:00
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : '{"prop":"val"}' , contentType : "application/json" ,
}
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : JSON . parse ( options . sendMsg . payload ) } ) ;
2022-04-21 22:32:24 +02:00
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto-detect" , topicType : "static" } , { } , options , hooks ) ;
2022-04-15 19:21:36 +02:00
} ) ;
itConditional ( 'should send invalid JSON with v5 media type "application/json" and raise an error (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
options . sendMsg = {
topic : nextTopic ( ) ,
payload : '{prop:"value3", "num":3}' , contentType : "application/json" , // send invalid JSON ...
}
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
hooks . afterLoad = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
helperNode . on ( "input" , function ( msg ) {
try {
msg . should . have . a . property ( "error" ) . type ( "object" ) ;
msg . error . should . have . a . property ( "source" ) . type ( "object" ) ;
msg . error . source . should . have . a . property ( "id" , mqttIn . id ) ;
done ( ) ;
} catch ( err ) {
done ( err )
}
} ) ;
return true ; //handled
}
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
2022-04-21 22:32:24 +02:00
itConditional ( 'should send buffer with v5 media type "application/json" and receive an object (auto-detect mode)' , function ( done ) {
2022-04-15 19:21:36 +02:00
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : Buffer . from ( [ 0x7b , 0x22 , 0x70 , 0x72 , 0x6f , 0x70 , 0x22 , 0x3a , 0x22 , 0x76 , 0x61 , 0x6c , 0x22 , 0x7d ] ) , contentType : "application/json" ,
}
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : { "prop" : "val" } } ) ;
2022-04-21 22:32:24 +02:00
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto-detect" , topicType : "static" } , { } , options , hooks ) ;
2022-04-15 19:21:36 +02:00
} ) ;
itConditional ( 'should send buffer with v5 media type "text/plain" and receive a string (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : Buffer . from ( [ 0x7b , 0x22 , 0x70 , 0x72 , 0x6f , 0x70 , 0x22 , 0x3a , 0x22 , 0x76 , 0x61 , 0x6c , 0x22 , 0x7d ] ) , contentType : "text/plain" ,
}
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : '{"prop":"val"}' } ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
itConditional ( 'should send buffer with v5 media type "application/zip" and receive a buffer (auto mode)' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : Buffer . from ( [ 0x7b , 0x22 , 0x70 , 0x72 , 0x6f , 0x70 , 0x22 , 0x3a , 0x22 , 0x76 , 0x61 , 0x6c , 0x22 , 0x7d ] ) , contentType : "application/zip" ,
}
options . expectMsg = Object . assign ( { } , options . sendMsg , { payload : Buffer . from ( [ 0x7b , 0x22 , 0x70 , 0x72 , 0x6f , 0x70 , 0x22 , 0x3a , 0x22 , 0x76 , 0x61 , 0x6c , 0x22 , 0x7d ] ) } ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "auto" , topicType : "static" } , { } , options , hooks ) ;
} ) ;
2022-03-19 18:29:31 +01:00
itConditional ( 'should subscribe dynamically via action' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
options . sendMsg = {
topic : nextTopic ( ) , payload : "abc"
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
options . expectMsg = Object . assign ( { } , options . sendMsg ) ;
testSendRecv ( { protocolVersion : 5 } , { datatype : "utf8" , topicType : "dynamic" } , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
//#endregion BASIC TESTS
//#region ################### ADVANCED TESTS ################### #//
itConditional ( 'should connect via "connect" action' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
const hooks = { done : null , beforeLoad : null , afterLoad : null , afterConnect : null }
hooks . afterLoad = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
mqttBroker . should . have . property ( "autoConnect" , false ) ;
mqttBroker . should . have . property ( "connecting" , false ) ; //should not attempt to connect (autoConnect:false)
mqttIn . receive ( { "action" : "connect" } ) ; //now request connect action
return true ; //handled
}
hooks . afterConnect = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
done ( ) ; //if we got here, it connected :)
return true ;
}
testSendRecv ( { protocolVersion : 5 , autoConnect : false } , { datatype : "utf8" , topicType : "dynamic" } , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
itConditional ( 'should disconnect via "disconnect" action' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
const options = { }
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null }
hooks . beforeLoad = ( flow ) => { //add a status node pointed at MQTT Out node (to watch for connection status change)
flow . push ( { "id" : "status.node" , "type" : "status" , "name" : "status_node" , "scope" : [ "mqtt.out" ] , "wires" : [ [ "helper.node" ] ] } ) ; //add status node to watch mqtt_out
}
hooks . afterLoad = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
mqttBroker . should . have . property ( "autoConnect" , true ) ;
mqttBroker . should . have . property ( "connecting" , true ) ; //should be trying to connect (autoConnect:true)
return true ; //handled
}
hooks . afterConnect = ( helperNode , mqttBroker , mqttIn , mqttOut ) => {
//connected - now add the "on" handler then send "disconnect" action
helperNode . on ( "input" , function ( msg ) {
try {
msg . should . have . property ( "status" ) ;
msg . status . should . have . property ( "text" ) ;
msg . status . text . should . containEql ( 'disconnect' ) ;
done ( ) ; //it disconnected - yey!
} catch ( error ) {
done ( error )
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
} )
mqttOut . receive ( { "action" : "disconnect" } ) ;
return true ; //handed
}
testSendRecv ( { protocolVersion : 5 } , null , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
itConditional ( 'should publish birth message' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
2022-03-19 18:29:31 +01:00
const baseTopic = nextTopic ( ) ;
const brokerOptions = {
protocolVersion : 4 ,
birthTopic : baseTopic + "/birth" ,
birthPayload : "broker connected" ,
birthQos : 2 ,
}
2022-03-21 14:50:24 +01:00
const options = { } ;
const hooks = { done : done , beforeLoad : null , afterLoad : null , afterConnect : null } ;
options . expectMsg = {
2022-03-19 18:29:31 +01:00
topic : brokerOptions . birthTopic ,
payload : brokerOptions . birthPayload ,
qos : brokerOptions . birthQos
} ;
2022-03-21 14:50:24 +01:00
testSendRecv ( brokerOptions , { topic : brokerOptions . birthTopic } , { } , options , hooks ) ;
2022-03-19 18:29:31 +01:00
} ) ;
itConditional ( 'should publish close message' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
2022-03-19 18:29:31 +01:00
const baseTopic = nextTopic ( ) ;
2022-03-21 14:50:24 +01:00
const broker1Options = { id : "mqtt.broker1" } //Broker 1 - stays connected to receive the close message
const broker2Options = { id : "mqtt.broker2" , closeTopic : baseTopic + "/close" , closePayload : '{"msg":"close"}' , closeQos : 1 , } //Broker 2 - connects to same broker but has a LWT message.
const { flow } = buildBasicMQTTSendRecvFlow ( broker1Options , { broker : broker1Options . id , topic : broker2Options . closeTopic , datatype : "json" } , { broker : broker2Options . id } )
flow . push ( buildMQTTBrokerNode ( broker2Options . id , broker2Options . name , BROKER _HOST , BROKER _PORT , broker2Options ) ) ; //add second broker
2022-03-19 18:29:31 +01:00
helper . load ( mqttNodes , flow , function ( ) {
const helperNode = helper . getNode ( "helper.node" ) ;
const mqttOut = helper . getNode ( "mqtt.out" ) ;
const mqttBroker1 = helper . getNode ( "mqtt.broker1" ) ;
const mqttBroker2 = helper . getNode ( "mqtt.broker2" ) ;
2022-04-29 20:56:37 +02:00
waitBrokerConnect ( [ mqttBroker1 , mqttBroker2 ] )
. then ( ( ) => {
2022-03-19 18:29:31 +01:00
//connected - add the on handler and call to disconnect
helperNode . on ( "input" , function ( msg ) {
try {
msg . should . have . property ( "topic" , broker2Options . closeTopic ) ;
2022-03-21 14:50:24 +01:00
msg . should . have . property ( 'payload' , JSON . parse ( broker2Options . closePayload ) ) ;
2022-03-19 18:29:31 +01:00
msg . should . have . property ( 'qos' , broker2Options . closeQos ) ;
done ( ) ;
} catch ( error ) {
done ( error )
}
} )
2022-03-21 14:50:24 +01:00
mqttOut . receive ( { "action" : "disconnect" } ) ; //close broker2
} )
2022-04-29 20:56:37 +02:00
. catch ( done ) ;
2022-03-19 18:29:31 +01:00
} ) ;
} ) ;
itConditional ( 'should publish will message' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
2022-03-19 18:29:31 +01:00
const baseTopic = nextTopic ( ) ;
2022-03-21 14:50:24 +01:00
const broker1Options = { id : "mqtt.broker1" } //Broker 1 - stays connected to receive the will message
const broker2Options = { id : "mqtt.broker2" , willTopic : baseTopic + "/will" , willPayload : '{"msg":"will"}' , willQos : 2 , } //Broker 2 - connects to same broker but has a LWT message.
const { flow } = buildBasicMQTTSendRecvFlow ( broker1Options , { broker : broker1Options . id , topic : broker2Options . willTopic , datatype : "utf8" } , { broker : broker2Options . id } )
flow . push ( buildMQTTBrokerNode ( broker2Options . id , broker2Options . name , BROKER _HOST , BROKER _PORT , broker2Options ) ) ; //add second broker
2022-03-19 18:29:31 +01:00
helper . load ( mqttNodes , flow , function ( ) {
const helperNode = helper . getNode ( "helper.node" ) ;
const mqttBroker1 = helper . getNode ( "mqtt.broker1" ) ;
const mqttBroker2 = helper . getNode ( "mqtt.broker2" ) ;
2022-04-29 20:56:37 +02:00
waitBrokerConnect ( [ mqttBroker1 , mqttBroker2 ] )
. then ( ( ) => {
2022-03-19 18:29:31 +01:00
//connected - add the on handler and call to disconnect
helperNode . on ( "input" , function ( msg ) {
try {
2022-03-21 14:50:24 +01:00
msg . should . have . property ( "topic" , broker2Options . willTopic ) ;
msg . should . have . property ( 'payload' , broker2Options . willPayload ) ;
msg . should . have . property ( 'qos' , broker2Options . willQos ) ;
2022-03-19 18:29:31 +01:00
done ( ) ;
} catch ( error ) {
done ( error )
}
} ) ;
mqttBroker2 . client . end ( true ) ; //force closure
2022-03-21 14:50:24 +01:00
} )
2022-04-29 20:56:37 +02:00
. catch ( done ) ;
2022-03-19 18:29:31 +01:00
} ) ;
} ) ;
itConditional ( 'should publish will message with V5 properties' , function ( done ) {
if ( skipTests ) { return this . skip ( ) }
// return this.skip(); //Issue receiving v5 props on will msg. Issue raised here: https://github.com/mqttjs/MQTT.js/issues/1455
2022-03-21 14:50:24 +01:00
this . timeout = 2000 ;
2022-03-19 18:29:31 +01:00
const baseTopic = nextTopic ( ) ;
//Broker 1 - stays connected to receive the will message when broker 2 is killed
2022-03-21 14:50:24 +01:00
const broker1Options = { id : "mqtt.broker1" , name : "mqtt_broker1" , protocolVersion : 5 , datatype : "utf8" }
2022-03-19 18:29:31 +01:00
//Broker 2 - connects to same broker but has a LWT message. Broker 2 gets killed shortly after connection so that the will message is sent from broker
const broker2Options = {
id : "mqtt.broker2" , name : "mqtt_broker2" , protocolVersion : 5 ,
willTopic : baseTopic + "/will" ,
willPayload : '{"msg":"will"}' ,
willQos : 2 ,
willMsg : {
2022-03-21 14:50:24 +01:00
contentType : 'application/json' ,
userProps : { "will" : "value" } ,
respTopic : baseTopic + "/resp" ,
2022-03-19 18:29:31 +01:00
correl : Buffer . from ( "abc" ) ,
expiry : 2000 ,
payloadFormatIndicator : true
}
}
2022-03-21 14:50:24 +01:00
const expectMsg = {
2022-03-19 18:29:31 +01:00
topic : broker2Options . willTopic ,
payload : broker2Options . willPayload ,
qos : broker2Options . willQos ,
contentType : broker2Options . willMsg . contentType ,
userProperties : broker2Options . willMsg . userProps ,
responseTopic : broker2Options . willMsg . respTopic ,
correlationData : broker2Options . willMsg . correl ,
messageExpiryInterval : broker2Options . willMsg . expiry ,
// payloadFormatIndicator: broker2Options.willMsg.payloadFormatIndicator,
} ;
2022-03-21 14:50:24 +01:00
const { flow , nodes } = buildBasicMQTTSendRecvFlow ( broker1Options , { broker : broker1Options . id , topic : broker2Options . willTopic , datatype : "utf8" } , { broker : broker2Options . id } )
flow . push ( buildMQTTBrokerNode ( broker2Options . id , broker2Options . name , nodes . mqtt _broker1 . broker , nodes . mqtt _broker1 . port , broker2Options ) ) //add second broker with will msg set
2022-03-19 18:29:31 +01:00
helper . load ( mqttNodes , flow , function ( ) {
const helperNode = helper . getNode ( "helper.node" ) ;
const mqttBroker1 = helper . getNode ( "mqtt.broker1" ) ;
const mqttBroker2 = helper . getNode ( "mqtt.broker2" ) ;
2022-04-29 20:56:37 +02:00
waitBrokerConnect ( [ mqttBroker1 , mqttBroker2 ] )
. then ( ( ) => {
2022-03-19 18:29:31 +01:00
//connected - add the on handler and call to disconnect
helperNode . on ( "input" , function ( msg ) {
try {
compareMsgToExpected ( msg , expectMsg ) ;
done ( ) ;
} catch ( error ) {
done ( error )
}
} ) ;
2022-03-21 14:50:24 +01:00
mqttBroker2 . client . end ( true ) ; //force closure
} )
2022-04-29 20:56:37 +02:00
. catch ( done ) ;
2022-03-19 18:29:31 +01:00
} ) ;
} ) ;
//#endregion ADVANCED TESTS
} ) ;
//#region ################### HELPERS ################### #//
/ * *
2022-03-21 14:50:24 +01:00
* A basic unit test that builds a flow containing 1 broker , 1 mqtt - in , one mqtt - out and a helper .
2022-03-19 18:29:31 +01:00
* It performs the following steps : builds flow , loads flow , waits for connection , sends ` sendMsg ` ,
* waits for msg then compares ` sendMsg ` to ` expectMsg ` , and finally calls ` done `
* @ param { object } brokerOptions anything that can be set in an MQTTBrokerNode ( e . g . id , name , url , broker , server , port , protocolVersion , ... )
* @ param { object } inNodeOptions anything that can be set in an MQTTInNode ( e . g . id , name , broker , topic , rh , nl , rap , ... )
* @ param { object } outNodeOptions anything that can be set in an MQTTOutNode ( e . g . id , name , broker , ... )
2022-03-21 14:50:24 +01:00
* @ param { object } options an object for passing in test properties like ` sendMsg ` and ` expectMsg `
* @ param { object } hooks an object containing hook functions ...
* * [ fn ] ` done() ` - the tests done function . If excluded , an error will be thrown upon test error
* * [ fn ] ` beforeLoad(flow) ` - provides opportunity to adjust the flow JSON before loading into runtime
* * [ fn ] ` afterLoad(helperNode, mqttBroker, mqttIn, mqttOut) ` - called before connection attempt
* * [ fn ] ` afterConnect(helperNode, mqttBroker, mqttIn, mqttOut) ` - called before connection attempt
2022-03-19 18:29:31 +01:00
* /
2022-03-21 14:50:24 +01:00
function testSendRecv ( brokerOptions , inNodeOptions , outNodeOptions , options , hooks ) {
options = options || { } ;
2022-03-19 18:29:31 +01:00
brokerOptions = brokerOptions || { } ;
inNodeOptions = inNodeOptions || { } ;
outNodeOptions = outNodeOptions || { } ;
2022-03-21 14:50:24 +01:00
const sendMsg = options . sendMsg || { } ;
2022-03-19 18:29:31 +01:00
sendMsg . topic = sendMsg . topic || nextTopic ( ) ;
2022-03-21 14:50:24 +01:00
const expectMsg = options . expectMsg || Object . assign ( { } , sendMsg ) ;
expectMsg . payload = inNodeOptions . payload === undefined ? expectMsg . payload : inNodeOptions . payload ;
if ( inNodeOptions . topicType != "dynamic" ) {
2022-03-19 18:29:31 +01:00
inNodeOptions . topic = inNodeOptions . topic || sendMsg . topic ;
}
2022-03-21 14:50:24 +01:00
const { flow , nodes } = buildBasicMQTTSendRecvFlow ( brokerOptions , inNodeOptions , outNodeOptions ) ;
if ( hooks . beforeLoad ) { hooks . beforeLoad ( flow ) }
2022-03-19 18:29:31 +01:00
helper . load ( mqttNodes , flow , function ( ) {
try {
const helperNode = helper . getNode ( "helper.node" ) ;
const mqttBroker = helper . getNode ( brokerOptions . id ) ;
2022-03-21 14:50:24 +01:00
const mqttIn = helper . getNode ( nodes . mqtt _in . id ) ;
const mqttOut = helper . getNode ( nodes . mqtt _out . id ) ;
let afterLoadHandled = false ;
if ( hooks . afterLoad ) {
afterLoadHandled = hooks . afterLoad ( helperNode , mqttBroker , mqttIn , mqttOut )
}
if ( ! afterLoadHandled ) {
helperNode . on ( "input" , function ( msg ) {
try {
2022-03-19 18:29:31 +01:00
compareMsgToExpected ( msg , expectMsg ) ;
2022-03-21 14:50:24 +01:00
if ( hooks . done ) { hooks . done ( ) ; }
} catch ( err ) {
if ( hooks . done ) { hooks . done ( err ) ; }
else { throw err ; }
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
} ) ;
}
2022-04-29 20:56:37 +02:00
waitBrokerConnect ( mqttBroker )
. then ( ( ) => {
2022-03-19 18:29:31 +01:00
//finally, connected!
2022-03-21 14:50:24 +01:00
if ( hooks . afterConnect ) {
let handled = hooks . afterConnect ( helperNode , mqttBroker , mqttIn , mqttOut ) ;
if ( handled ) { return }
}
2022-04-29 20:56:37 +02:00
if ( sendMsg . topic ) {
if ( mqttIn . isDynamic ) {
mqttIn . receive ( { "action" : "subscribe" , "topic" : sendMsg . topic } )
}
mqttOut . receive ( sendMsg ) ;
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
} )
2022-04-29 20:56:37 +02:00
. catch ( ( e ) => {
if ( hooks . done ) { hooks . done ( e ) ; }
else { throw e ; }
} ) ;
2022-03-21 14:50:24 +01:00
} catch ( err ) {
if ( hooks . done ) { hooks . done ( err ) ; }
else { throw err ; }
2022-03-19 18:29:31 +01:00
}
} ) ;
}
/ * *
2022-03-21 14:50:24 +01:00
* Builds a flow containing 2 parts .
* * 1 : MQTT Out node ( with broker configured ) .
* * 2 : MQTT In node ( with broker configured ) -- > helper node ` id:helper.node `
2022-03-19 18:29:31 +01:00
* /
2022-03-21 14:50:24 +01:00
function buildBasicMQTTSendRecvFlow ( brokerOptions , inOptions , outOptions ) {
2022-03-19 18:29:31 +01:00
brokerOptions = brokerOptions || { } ;
2022-03-21 14:50:24 +01:00
brokerOptions . broker = brokerOptions . broker || BROKER _HOST ;
brokerOptions . port = brokerOptions . port || BROKER _PORT ;
2022-03-19 18:29:31 +01:00
brokerOptions . autoConnect = String ( brokerOptions . autoConnect ) == "false" ? false : true ;
2022-03-21 14:50:24 +01:00
const broker = buildMQTTBrokerNode ( brokerOptions . id , brokerOptions . name , brokerOptions . broker , brokerOptions . port , brokerOptions ) ;
const inNode = buildMQTTInNode ( inOptions . id , inOptions . name , inOptions . broker || broker . id , inOptions . topic , inOptions , [ "helper.node" ] ) ;
const outNode = buildMQTTOutNode ( outOptions . id , outOptions . name , outOptions . broker || broker . id , outOptions . topic , outOptions ) ;
const helper = buildNode ( "helper" , "helper.node" , "helper_node" , { } ) ;
2022-04-15 19:21:36 +02:00
const catchNode = buildNode ( "catch" , "catch.node" , "catch_node" , { "scope" : [ "mqtt.in" ] } , [ "helper.node" ] ) ;
2022-03-21 14:50:24 +01:00
return {
nodes : {
[ broker . name ] : broker ,
[ inNode . name ] : inNode ,
[ outNode . name ] : outNode ,
[ helper . name ] : helper ,
2022-04-15 19:21:36 +02:00
[ catchNode . name ] : catchNode ,
2022-03-21 14:50:24 +01:00
} ,
2022-04-15 19:21:36 +02:00
flow : [ broker , inNode , outNode , helper , catchNode ]
2022-03-21 14:50:24 +01:00
}
2022-03-19 18:29:31 +01:00
}
2022-03-21 14:50:24 +01:00
function buildMQTTBrokerNode ( id , name , brokerHost , brokerPort , options ) {
2022-03-19 18:29:31 +01:00
// url,broker,port,clientid,autoConnect,usetls,usews,verifyservercert,compatmode,protocolVersion,keepalive,
//cleansession,sessionExpiry,topicAliasMaximum,maximumPacketSize,receiveMaximum,userProperties,userPropertiesType,autoUnsubscribe
options = options || { } ;
2022-03-21 14:50:24 +01:00
const node = buildNode ( "mqtt-broker" , id || "mqtt.broker" , name || "mqtt_broker" , options ) ;
2022-03-19 18:29:31 +01:00
node . url = options . url ;
2022-03-21 14:50:24 +01:00
node . broker = brokerHost || options . broker || BROKER _HOST ;
node . port = brokerPort || options . port || BROKER _PORT ;
2022-03-19 18:29:31 +01:00
node . clientid = options . clientid || "" ;
node . cleansession = String ( options . cleansession ) == "false" ? false : true ;
node . autoUnsubscribe = String ( options . autoUnsubscribe ) == "false" ? false : true ;
node . autoConnect = String ( options . autoConnect ) == "false" ? false : true ;
if ( options . birthTopic ) {
node . birthTopic = options . birthTopic ;
node . birthQos = options . birthQos || "0" ;
node . birthPayload = options . birthPayload || "" ;
}
if ( options . closeTopic ) {
node . closeTopic = options . closeTopic ;
node . closeQos = options . closeQos || "0" ;
node . closePayload = options . closePayload || "" ;
}
if ( options . willTopic ) {
node . willTopic = options . willTopic ;
node . willQos = options . willQos || "0" ;
node . willPayload = options . willPayload || "" ;
}
updateNodeOptions ( options , node ) ;
return node ;
}
function buildMQTTInNode ( id , name , brokerId , topic , options , wires ) {
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
options = options || { } ;
2022-03-21 14:50:24 +01:00
options . broker = options . broker || "mqtt.broker" ;
const node = buildNode ( "mqtt in" , id || "mqtt.in" , name || "mqtt_in" , options ) ;
2022-03-19 18:29:31 +01:00
node . topic = topic || "" ;
node . broker = brokerId ;
node . topicType = options . topicType == "dynamic" ? "dynamic" : "static" ,
2022-03-21 14:50:24 +01:00
node . inputs = options . topicType == "dynamic" ? 1 : 0 ,
updateNodeOptions ( node , options , wires ) ;
2022-03-19 18:29:31 +01:00
return node ;
}
function buildMQTTOutNode ( id , name , brokerId , topic , options ) {
//{ "id": "mqtt.out", "type": "mqtt out", "name": "mqtt_out", "topic": "test/out", "qos": "", "retain": "", "respTopic": "", "contentType": "", "userProps": "", "correl": "", "expiry": "", "broker": brokerId, "wires": [] },
options = options || { } ;
2022-03-21 14:50:24 +01:00
options . broker = options . broker || "mqtt.broker" ;
const node = buildNode ( "mqtt out" , id || "mqtt.out" , name || "mqtt_out" , options ) ;
2022-03-19 18:29:31 +01:00
node . topic = topic || "" ;
node . broker = brokerId ;
updateNodeOptions ( node , options , null ) ;
return node ;
}
function buildNode ( type , id , name , options , wires ) {
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
options = options || { } ;
const node = {
2022-03-21 14:50:24 +01:00
"id" : id || ( type . replace ( /[\W]/g , "." ) ) ,
2022-03-19 18:29:31 +01:00
"type" : type ,
2022-03-21 14:50:24 +01:00
"name" : name || ( type . replace ( /[\W]/g , "_" ) ) ,
2022-03-19 18:29:31 +01:00
"wires" : [ ]
}
2022-03-21 14:50:24 +01:00
if ( node . id . indexOf ( "." ) == - 1 ) { node . is += ".node" }
2022-03-19 18:29:31 +01:00
updateNodeOptions ( node , options , wires ) ;
return node ;
}
function updateNodeOptions ( node , options , wires ) {
let keys = Object . keys ( options ) ;
for ( let index = 0 ; index < keys . length ; index ++ ) {
const key = keys [ index ] ;
const val = options [ key ] ;
if ( node [ key ] === undefined ) {
node [ key ] = val ;
}
}
if ( wires && Array . isArray ( wires ) ) {
node . wires [ 0 ] = [ ... wires ] ;
}
}
function compareMsgToExpected ( msg , expectMsg ) {
msg . should . have . property ( "topic" , expectMsg . topic ) ;
msg . should . have . property ( "payload" , expectMsg . payload ) ;
if ( hasProperty ( expectMsg , "retain" ) ) { msg . retain . should . eql ( expectMsg . retain ) ; }
if ( hasProperty ( expectMsg , "qos" ) ) {
msg . qos . should . eql ( expectMsg . qos ) ;
} else {
msg . qos . should . eql ( 0 ) ;
}
if ( hasProperty ( expectMsg , "userProperties" ) ) { msg . should . have . property ( "userProperties" , expectMsg . userProperties ) ; }
if ( hasProperty ( expectMsg , "contentType" ) ) { msg . should . have . property ( "contentType" , expectMsg . contentType ) ; }
if ( hasProperty ( expectMsg , "correlationData" ) ) { msg . should . have . property ( "correlationData" , expectMsg . correlationData ) ; }
if ( hasProperty ( expectMsg , "responseTopic" ) ) { msg . should . have . property ( "responseTopic" , expectMsg . responseTopic ) ; }
if ( hasProperty ( expectMsg , "payloadFormatIndicator" ) ) { msg . should . have . property ( "payloadFormatIndicator" , expectMsg . payloadFormatIndicator ) ; }
if ( hasProperty ( expectMsg , "messageExpiryInterval" ) ) { msg . should . have . property ( "messageExpiryInterval" , expectMsg . messageExpiryInterval ) ; }
}
2022-04-29 20:56:37 +02:00
function waitBrokerConnect ( broker , timeLimit ) {
let waitConnected = ( broker , timeLimit ) => {
const brokers = Array . isArray ( broker ) ? broker : [ broker ] ;
timeLimit = timeLimit || 1000 ;
let timer , resolved = false ;
return new Promise ( ( resolve , reject ) => {
timer = wait ( ) ;
function wait ( ) {
if ( brokers . every ( e => e . connected == true ) ) {
resolved = true ;
clearTimeout ( timer ) ;
resolve ( ) ;
} else {
timeLimit = timeLimit - 15 ;
if ( timeLimit <= 0 ) {
if ( ! resolved ) {
reject ( "Timeout waiting broker connect" )
}
}
timer = setTimeout ( wait , 15 ) ;
return timer ;
}
2022-03-21 14:50:24 +01:00
}
2022-04-29 20:56:37 +02:00
} ) ;
} ;
return waitConnected ( broker , timeLimit ) ;
2022-03-21 14:50:24 +01:00
}
2022-03-19 18:29:31 +01:00
function hasProperty ( obj , propName ) {
return Object . prototype . hasOwnProperty . call ( obj , propName ) ;
}
const base _topic = "nr" + Date . now ( ) . toString ( ) + "/" ;
let topicNo = 0 ;
function nextTopic ( topic ) {
topicNo ++ ;
if ( ! topic ) { topic = "unittest" }
if ( topic . startsWith ( "/" ) ) { topic = topic . substring ( 1 ) ; }
if ( topic . startsWith ( base _topic ) ) { return topic + String ( topicNo ) }
return ( base _topic + topic + String ( topicNo ) ) ;
}
//#endregion HELPERS