mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
Merge remote-tracking branch 'upstream/dev' into proxy-logiv-dev-v4
This commit is contained in:
@@ -31,6 +31,9 @@ var multer = require("multer");
|
||||
var RED = require("nr-test-utils").require("node-red/lib/red");
|
||||
var fs = require('fs-extra');
|
||||
var auth = require('basic-auth');
|
||||
var crypto = require("crypto");
|
||||
const { version } = require("os");
|
||||
const net = require('net')
|
||||
|
||||
describe('HTTP Request Node', function() {
|
||||
var testApp;
|
||||
@@ -161,6 +164,100 @@ describe('HTTP Request Node', function() {
|
||||
delete process.env.NO_PROXY;
|
||||
}
|
||||
|
||||
function getDigestPassword() {
|
||||
return 'digest-test-password';
|
||||
}
|
||||
|
||||
function getDigest(algorithm, value) {
|
||||
var hash;
|
||||
if (algorithm === 'SHA-256') {
|
||||
hash = crypto.createHash('sha256');
|
||||
} else if (algorithm === 'SHA-512-256') {
|
||||
hash = crypto.createHash('sha512');
|
||||
} else {
|
||||
hash = crypto.createHash('md5');
|
||||
}
|
||||
|
||||
var hex = hash.update(value).digest('hex');
|
||||
if (algorithm === 'SHA-512-256') {
|
||||
hex = hex.slice(0, 64);
|
||||
}
|
||||
return hex;
|
||||
}
|
||||
|
||||
function getDigestResponse(req, algorithm, sess, realm, username, nonce, nc, cnonce, qop) {
|
||||
var ha1 = getDigest(algorithm, username + ':' + realm + ':' + getDigestPassword());
|
||||
if (sess) {
|
||||
ha1 = getDigest(algorithm, ha1 + ':' + nonce + ':' + cnonce)
|
||||
}
|
||||
let ha2 = getDigest(algorithm, req.method + ':' + req.path);
|
||||
return qop
|
||||
? getDigest(algorithm, ha1 + ':' + nonce + ':' + nc + ':' + cnonce + ':' + qop + ':' + ha2)
|
||||
: getDigest(algorithm, ha1 + ':' + nonce + ':' + ha2);
|
||||
}
|
||||
|
||||
function handleDigestResponse(req, res, algorithm, sess, qop) {
|
||||
let realm = "node-red";
|
||||
let nonce = "123456";
|
||||
let nc = '00000001';
|
||||
let algorithmValue = sess ? `${algorithm}-sess` : algorithm;
|
||||
|
||||
let authHeader = req.headers['authorization'];
|
||||
if (!authHeader) {
|
||||
let qopField = qop ? `qop="${qop}", ` : '';
|
||||
|
||||
res.setHeader(
|
||||
'WWW-Authenticate',
|
||||
`Digest ${qopField}realm="${realm}", nonce="${nonce}", algorithm="${algorithmValue}"`
|
||||
);
|
||||
res.status(401).end();
|
||||
return;
|
||||
}
|
||||
|
||||
var authFields = {};
|
||||
let re = /([a-z0-9_-]+)=(?:"([^"]+)"|([a-z0-9_-]+))/gi;
|
||||
for (;;) {
|
||||
var match = re.exec(authHeader);
|
||||
if (!match) {
|
||||
break;
|
||||
}
|
||||
authFields[match[1]] = match[2] || match[3];
|
||||
}
|
||||
// console.log(JSON.stringify(authFields));
|
||||
|
||||
if (qop && authFields['qop'] != qop) {
|
||||
console.log('test1');
|
||||
res.status(401).end();
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
!authFields['username'] ||
|
||||
!authFields['response'] ||
|
||||
authFields['realm'] != realm ||
|
||||
authFields['nonce'] != nonce ||
|
||||
authFields['algorithm'] != algorithmValue
|
||||
) {
|
||||
console.log('test2');
|
||||
res.status(401).end();
|
||||
return;
|
||||
}
|
||||
|
||||
let username = authFields['username'];
|
||||
let response = authFields['response'];
|
||||
let cnonce = authFields['cnonce'] || '';
|
||||
let expectedResponse = getDigestResponse(
|
||||
req, algorithm, sess, realm, username, nonce, nc, cnonce, qop
|
||||
);
|
||||
if (!response || expectedResponse.toLowerCase() !== response.toLowerCase()) {
|
||||
console.log('test3', response, expectedResponse);
|
||||
res.status(401).end();
|
||||
return;
|
||||
}
|
||||
|
||||
res.status(201).end();
|
||||
}
|
||||
|
||||
before(function(done) {
|
||||
|
||||
testApp = express();
|
||||
@@ -220,6 +317,21 @@ describe('HTTP Request Node', function() {
|
||||
}
|
||||
res.json(result);
|
||||
});
|
||||
testApp.get('/authenticate-digest-md5', function(req, res){
|
||||
handleDigestResponse(req, res, "MD5", false, false);
|
||||
});
|
||||
testApp.get('/authenticate-digest-md5-sess', function(req, res){
|
||||
handleDigestResponse(req, res, "MD5", true, 'auth');
|
||||
});
|
||||
testApp.get('/authenticate-digest-md5-qop', function(req, res){
|
||||
handleDigestResponse(req, res, "MD5", false, 'auth');
|
||||
});
|
||||
testApp.get('/authenticate-digest-sha-256', function(req, res){
|
||||
handleDigestResponse(req, res, "SHA-256", false, 'auth');
|
||||
});
|
||||
testApp.get('/authenticate-digest-sha-512-256', function(req, res){
|
||||
handleDigestResponse(req, res, "SHA-512-256", false, 'auth');
|
||||
});
|
||||
testApp.get('/proxyAuthenticate', function(req, res){
|
||||
// var user = auth.parse(req.headers['proxy-authorization']);
|
||||
var result = {
|
||||
@@ -555,7 +667,7 @@ describe('HTTP Request Node', function() {
|
||||
var n2 = helper.getNode("n2");
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('payload','');
|
||||
msg.should.have.property('payload',{});
|
||||
msg.should.have.property('statusCode',204);
|
||||
done();
|
||||
} catch(err) {
|
||||
@@ -1527,7 +1639,7 @@ describe('HTTP Request Node', function() {
|
||||
msg.payload.headers.should.have.property('Content-Type').which.startWith('application/json');
|
||||
//msg.dynamicHeaderName should be present in headers with the value of msg.dynamicHeaderValue
|
||||
msg.payload.headers.should.have.property('dyn-header-name').which.startWith('dyn-header-value');
|
||||
//static (custom) header set in Flow UI should be present
|
||||
//static (custom) header set in Flow UI should be present
|
||||
msg.payload.headers.should.have.property('static-header-name').which.startWith('static-header-value');
|
||||
//msg.headers['location'] should be deleted because Flow UI "Location" header has a blank value
|
||||
//ensures headers with matching characters but different case are eliminated
|
||||
@@ -2016,6 +2128,100 @@ describe('HTTP Request Node', function() {
|
||||
});
|
||||
*/
|
||||
|
||||
it('should authenticate on server - digest MD5', function(done) {
|
||||
var flow = [{id:"n1",type:"http request",wires:[["n2"]],method:"GET",ret:"obj",authType:"digest", url:getTestURL('/authenticate-digest-md5')},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n1.credentials = {user:'xxxuser', password:getDigestPassword()};
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('statusCode',201);
|
||||
done();
|
||||
} catch(err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
n1.receive({payload:"foo"});
|
||||
});
|
||||
});
|
||||
|
||||
it('should authenticate on server - digest MD5 sess', function(done) {
|
||||
var flow = [{id:"n1",type:"http request",wires:[["n2"]],method:"GET",ret:"obj",authType:"digest", url:getTestURL('/authenticate-digest-md5-sess')},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n1.credentials = {user:'xxxuser', password:getDigestPassword()};
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('statusCode',201);
|
||||
done();
|
||||
} catch(err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
n1.receive({payload:"foo"});
|
||||
});
|
||||
});
|
||||
|
||||
it('should authenticate on server - digest MD5 qop', function(done) {
|
||||
var flow = [{id:"n1",type:"http request",wires:[["n2"]],method:"GET",ret:"obj",authType:"digest", url:getTestURL('/authenticate-digest-md5-qop')},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n1.credentials = {user:'xxxuser', password:getDigestPassword()};
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('statusCode',201);
|
||||
done();
|
||||
} catch(err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
n1.receive({payload:"foo"});
|
||||
});
|
||||
});
|
||||
|
||||
it('should authenticate on server - digest SHA-256', function(done) {
|
||||
var flow = [{id:"n1",type:"http request",wires:[["n2"]],method:"GET",ret:"obj",authType:"digest", url:getTestURL('/authenticate-digest-sha-256')},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n1.credentials = {user:'xxxuser', password:getDigestPassword()};
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('statusCode',201);
|
||||
done();
|
||||
} catch(err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
n1.receive({payload:"foo"});
|
||||
});
|
||||
});
|
||||
|
||||
it('should authenticate on server - digest SHA-512-256', function(done) {
|
||||
var flow = [{id:"n1",type:"http request",wires:[["n2"]],method:"GET",ret:"obj",authType:"digest", url:getTestURL('/authenticate-digest-sha-512-256')},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n1.credentials = {user:'xxxuser', password:getDigestPassword()};
|
||||
n2.on("input", function(msg) {
|
||||
try {
|
||||
msg.should.have.property('statusCode',201);
|
||||
done();
|
||||
} catch(err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
n1.receive({payload:"foo"});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('file-upload', function() {
|
||||
@@ -2265,4 +2471,71 @@ describe('HTTP Request Node', function() {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('should parse broken headers', function() {
|
||||
|
||||
const versions = process.versions.node.split('.')
|
||||
|
||||
if (( versions[0] == 14 && versions[1] >= 20 ) ||
|
||||
( versions[0] == 16 && versions[1] >= 16 ) ||
|
||||
( versions[0] == 18 && versions[1] >= 5 ) ||
|
||||
( versions[0] > 18)) {
|
||||
// only test if on new enough NodeJS version
|
||||
|
||||
let port = testPort++
|
||||
|
||||
let server;
|
||||
|
||||
before(function() {
|
||||
server = net.createServer(function (socket) {
|
||||
socket.write("HTTP/1.0 200\nContent-Type: text/plain\n\nHelloWorld")
|
||||
socket.end()
|
||||
})
|
||||
|
||||
server.listen(port,'127.0.0.1', function(err) {
|
||||
})
|
||||
});
|
||||
|
||||
after(function() {
|
||||
server.close()
|
||||
});
|
||||
|
||||
it('should accept broken headers', function (done) {
|
||||
var flow = [{id:'n1',type:'http request',wires:[['n2']],method:'GET',ret:'obj',url:`http://localhost:${port}/`, insecureHTTPParser: true},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n2.on('input', function(msg) {
|
||||
try {
|
||||
msg.payload.should.equal('HelloWorld')
|
||||
done()
|
||||
} catch (err) {
|
||||
done(err)
|
||||
}
|
||||
})
|
||||
n1.receive({payload: 'foo'})
|
||||
});
|
||||
});
|
||||
|
||||
it('should reject broken headers', function (done) {
|
||||
var flow = [{id:'n1',type:'http request',wires:[['n2']],method:'GET',ret:'obj',url:`http://localhost:${port}/`},
|
||||
{id:"n2", type:"helper"}];
|
||||
helper.load(httpRequestNode, flow, function() {
|
||||
var n1 = helper.getNode("n1");
|
||||
var n2 = helper.getNode("n2");
|
||||
n2.on('input', function(msg) {
|
||||
try{
|
||||
msg.payload.should.match(/RequestError: Parse Error/)
|
||||
done()
|
||||
} catch (err) {
|
||||
done(err)
|
||||
}
|
||||
})
|
||||
n1.receive({payload: 'foo'})
|
||||
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@@ -27,7 +27,7 @@ describe('MQTT Nodes', function () {
|
||||
} catch (error) { }
|
||||
});
|
||||
|
||||
it('should be loaded and have default values', function (done) {
|
||||
it('should be loaded and have default values (MQTT V4)', function (done) {
|
||||
this.timeout = 2000;
|
||||
const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false }, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" });
|
||||
helper.load(mqttNodes, flow, function () {
|
||||
@@ -53,7 +53,7 @@ describe('MQTT Nodes', function () {
|
||||
mqttBroker.should.have.property('broker', BROKER_HOST);
|
||||
mqttBroker.should.have.property('port', BROKER_PORT);
|
||||
mqttBroker.should.have.property('brokerurl');
|
||||
// mqttBroker.should.have.property('autoUnsubscribe', true);//default: true
|
||||
mqttBroker.should.have.property('autoUnsubscribe', true); //default: true
|
||||
mqttBroker.should.have.property('autoConnect', false);//Set "autoConnect:false" in brokerOptions
|
||||
mqttBroker.should.have.property('options');
|
||||
mqttBroker.options.should.have.property('clean', true);
|
||||
@@ -61,6 +61,52 @@ describe('MQTT Nodes', function () {
|
||||
mqttBroker.options.clientId.should.containEql('nodered_');
|
||||
mqttBroker.options.should.have.property('keepalive').type("number");
|
||||
mqttBroker.options.should.have.property('reconnectPeriod').type("number");
|
||||
//as this is not a v5 connection, ensure v5 properties are not present
|
||||
mqttBroker.options.should.not.have.property('protocolVersion', 5);
|
||||
mqttBroker.options.should.not.have.property('properties');
|
||||
done();
|
||||
} catch (error) {
|
||||
done(error)
|
||||
}
|
||||
});
|
||||
});
|
||||
it('should be loaded and have default values (MQTT V5)', function (done) {
|
||||
this.timeout = 2000;
|
||||
const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false, cleansession: false, clientid: 'clientid', keepalive: 35, sessionExpiry: '6000', protocolVersion: '5', userProps: {"prop": "val"}}, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" });
|
||||
helper.load(mqttNodes, flow, function () {
|
||||
try {
|
||||
const mqttIn = helper.getNode("mqtt.in");
|
||||
const mqttOut = helper.getNode("mqtt.out");
|
||||
const mqttBroker = helper.getNode("mqtt.broker");
|
||||
|
||||
should(mqttIn).be.type("object", "mqtt in node should be an object")
|
||||
mqttIn.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
|
||||
mqttIn.should.have.property('datatype', 'utf8'); //default: 'utf8'
|
||||
mqttIn.should.have.property('isDynamic', false); //default: false
|
||||
mqttIn.should.have.property('inputs', 0); //default: 0
|
||||
mqttIn.should.have.property('qos', 2); //default: 2
|
||||
mqttIn.should.have.property('topic', "in_topic");
|
||||
mqttIn.should.have.property('wires', [["helper.node"]]);
|
||||
|
||||
should(mqttOut).be.type("object", "mqtt out node should be an object")
|
||||
mqttOut.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
|
||||
mqttOut.should.have.property('topic', "out_topic");
|
||||
|
||||
should(mqttBroker).be.type("object", "mqtt broker node should be an object")
|
||||
mqttBroker.should.have.property('broker', BROKER_HOST);
|
||||
mqttBroker.should.have.property('port', BROKER_PORT);
|
||||
mqttBroker.should.have.property('brokerurl');
|
||||
mqttBroker.should.have.property('autoUnsubscribe', true);
|
||||
mqttBroker.should.have.property('autoConnect', false); //Set "autoConnect:false" in brokerOptions
|
||||
mqttBroker.should.have.property('options');
|
||||
mqttBroker.options.should.have.property('clean', false);
|
||||
mqttBroker.options.should.have.property('clientId', 'clientid');
|
||||
mqttBroker.options.should.have.property('keepalive').type("number", 35);
|
||||
mqttBroker.options.should.have.property('reconnectPeriod').type("number");
|
||||
//as this IS a v5 connection, ensure v5 properties are not present
|
||||
mqttBroker.options.should.have.property('protocolVersion', 5);
|
||||
mqttBroker.options.should.have.property('properties');
|
||||
mqttBroker.options.properties.should.have.property('sessionExpiryInterval');
|
||||
done();
|
||||
} catch (error) {
|
||||
done(error)
|
||||
@@ -173,7 +219,7 @@ describe('MQTT Nodes', function () {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}', // send invalid JSON ...
|
||||
}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
@@ -299,7 +345,7 @@ describe('MQTT Nodes', function () {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}', contentType: "application/json", // send invalid JSON ...
|
||||
}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
@@ -385,7 +431,7 @@ describe('MQTT Nodes', function () {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
const hooks = { beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
hooks.beforeLoad = (flow) => { //add a status node pointed at MQTT Out node (to watch for connection status change)
|
||||
flow.push({ "id": "status.node", "type": "status", "name": "status_node", "scope": ["mqtt.out"], "wires": [["helper.node"]] });//add status node to watch mqtt_out
|
||||
}
|
||||
@@ -416,20 +462,66 @@ describe('MQTT Nodes', function () {
|
||||
this.timeout = 2000;
|
||||
const baseTopic = nextTopic();
|
||||
const brokerOptions = {
|
||||
autoConnect: false,
|
||||
protocolVersion: 4,
|
||||
birthTopic: baseTopic + "/birth",
|
||||
birthPayload: "broker connected",
|
||||
birthPayload: "broker birth",
|
||||
birthQos: 2,
|
||||
}
|
||||
const options = {};
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null };
|
||||
options.expectMsg = {
|
||||
const expectMsg = {
|
||||
topic: brokerOptions.birthTopic,
|
||||
payload: brokerOptions.birthPayload,
|
||||
qos: brokerOptions.birthQos
|
||||
};
|
||||
const options = { };
|
||||
const hooks = { };
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
compareMsgToExpected(msg, expectMsg);
|
||||
done();
|
||||
} catch (error) {
|
||||
done(error)
|
||||
}
|
||||
})
|
||||
mqttIn.receive({ "action": "connect" }); //now request connect action
|
||||
return true; //handled
|
||||
}
|
||||
testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should safely discard bad birth topic', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const baseTopic = nextTopic();
|
||||
const brokerOptions = {
|
||||
protocolVersion: 4,
|
||||
birthTopic: baseTopic + "#", // a publish topic should never have a wildcard
|
||||
birthPayload: "broker connected",
|
||||
birthQos: 2,
|
||||
}
|
||||
const options = {};
|
||||
const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null };
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
msg.should.have.a.property("error").type("object");
|
||||
msg.error.should.have.a.property("source").type("object");
|
||||
msg.error.source.should.have.a.property("id", mqttIn.id);
|
||||
done();
|
||||
} catch (err) {
|
||||
done(err)
|
||||
}
|
||||
});
|
||||
return true; //handled
|
||||
}
|
||||
options.expectMsg = null;
|
||||
try {
|
||||
testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, options, hooks);
|
||||
done()
|
||||
} catch(err) {
|
||||
done(e)
|
||||
}
|
||||
});
|
||||
itConditional('should publish close message', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
@@ -587,12 +679,13 @@ function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hoo
|
||||
const mqttBroker = helper.getNode(brokerOptions.id);
|
||||
const mqttIn = helper.getNode(nodes.mqtt_in.id);
|
||||
const mqttOut = helper.getNode(nodes.mqtt_out.id);
|
||||
let afterLoadHandled = false;
|
||||
let afterLoadHandled = false, finished = false;
|
||||
if (hooks.afterLoad) {
|
||||
afterLoadHandled = hooks.afterLoad(helperNode, mqttBroker, mqttIn, mqttOut)
|
||||
}
|
||||
if (!afterLoadHandled) {
|
||||
helperNode.on("input", function (msg) {
|
||||
finished = true
|
||||
try {
|
||||
compareMsgToExpected(msg, expectMsg);
|
||||
if (hooks.done) { hooks.done(); }
|
||||
@@ -617,10 +710,12 @@ function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hoo
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
if(finished) { return }
|
||||
if (hooks.done) { hooks.done(e); }
|
||||
else { throw e; }
|
||||
});
|
||||
} catch (err) {
|
||||
if(finished) { return }
|
||||
if (hooks.done) { hooks.done(err); }
|
||||
else { throw err; }
|
||||
}
|
||||
@@ -666,6 +761,7 @@ function buildMQTTBrokerNode(id, name, brokerHost, brokerPort, options) {
|
||||
node.cleansession = String(options.cleansession) == "false" ? false : true;
|
||||
node.autoUnsubscribe = String(options.autoUnsubscribe) == "false" ? false : true;
|
||||
node.autoConnect = String(options.autoConnect) == "false" ? false : true;
|
||||
node.sessionExpiry = options.sessionExpiry ? options.sessionExpiry : undefined;
|
||||
|
||||
if (options.birthTopic) {
|
||||
node.birthTopic = options.birthTopic;
|
||||
@@ -760,8 +856,8 @@ function waitBrokerConnect(broker, timeLimit) {
|
||||
let waitConnected = (broker, timeLimit) => {
|
||||
const brokers = Array.isArray(broker) ? broker : [broker];
|
||||
timeLimit = timeLimit || 1000;
|
||||
let timer, resolved = false;
|
||||
return new Promise( (resolve, reject) => {
|
||||
let timer, resolved = false;
|
||||
timer = wait();
|
||||
function wait() {
|
||||
if (brokers.every(e => e.connected == true)) {
|
||||
|
Reference in New Issue
Block a user