mirror of
https://github.com/node-red/node-red.git
synced 2025-03-01 10:36:34 +00:00
update from upstream
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
"use strict";
|
||||
const should = require("should");
|
||||
const helper = require("node-red-node-test-helper");
|
||||
const { doesNotThrow } = require("should");
|
||||
const mqttNodes = require("nr-test-utils").require("@node-red/nodes/core/network/10-mqtt.js");
|
||||
const BROKER_HOST = process.env.MQTT_BROKER_SERVER || "localhost";
|
||||
const BROKER_PORT = process.env.MQTT_BROKER_PORT || 1883;
|
||||
@@ -72,8 +73,7 @@ describe('MQTT Nodes', function () {
|
||||
done();
|
||||
});
|
||||
}
|
||||
|
||||
/** Conditional test runner (only run if skipTests=false) */
|
||||
// Conditional test runner (only run if skipTests=false)
|
||||
function itConditional(title, test) {
|
||||
return !skipTests ? it(title, test) : it.skip(title, test);
|
||||
}
|
||||
@@ -92,7 +92,8 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send JSON and receive string (auto)', function (done) {
|
||||
//Prior to V3, "auto" mode would only parse to string or buffer.
|
||||
itConditional('should send JSON and receive string (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -104,7 +105,44 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||
})
|
||||
itConditional('should send JSON and receive string (utf8)', function (done) {
|
||||
//In V3, "auto" mode should try to parse JSON, then string and fall back to buffer
|
||||
itConditional('should send JSON and receive object (auto-detect mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(),
|
||||
payload: '{"prop":"value1", "num":1}',
|
||||
qos: 1
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
options.expectMsg.payload = JSON.parse(options.sendMsg.payload);
|
||||
testSendRecv({}, { datatype: "auto-detect", topicType: "static" }, {}, options, { done: done });
|
||||
})
|
||||
itConditional('should send invalid JSON and receive string (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}'// send invalid JSON ...
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);//expect same payload
|
||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send invalid JSON and receive string (auto-detect mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}'// send invalid JSON ...
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);//expect same payload
|
||||
testSendRecv({}, { datatype: "auto-detect", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
|
||||
itConditional('should send JSON and receive string (utf8 mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -116,7 +154,7 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({}, { datatype: "utf8", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send JSON and receive Object (json)', function (done) {
|
||||
itConditional('should send JSON and receive Object (json mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -127,7 +165,31 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: { "prop": "value3", "num": 3 } });//expect an object
|
||||
testSendRecv({}, { datatype: "json", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send String and receive Buffer (buffer)', function (done) {
|
||||
itConditional('should send invalid JSON and raise error (json mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}', // send invalid JSON ...
|
||||
}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
msg.should.have.a.property("error").type("object");
|
||||
msg.error.should.have.a.property("source").type("object");
|
||||
msg.error.source.should.have.a.property("id", mqttIn.id);
|
||||
done();
|
||||
} catch (err) {
|
||||
done(err)
|
||||
}
|
||||
});
|
||||
return true; //handled
|
||||
}
|
||||
testSendRecv({}, { datatype: "json", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send String and receive Buffer (buffer mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -138,7 +200,7 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: Buffer.from(options.sendMsg.payload) });//expect Buffer.from(msg.payload)
|
||||
testSendRecv({}, { datatype: "buffer", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send utf8 Buffer and receive String (auto)', function (done) {
|
||||
itConditional('should send utf8 Buffer and receive String (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -149,7 +211,7 @@ describe('MQTT Nodes', function () {
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: "x y z" });//set expected payload to "x y z"
|
||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||
});
|
||||
itConditional('should send non utf8 Buffer and receive Buffer (auto)', function (done) {
|
||||
itConditional('should send non utf8 Buffer and receive Buffer (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
@@ -158,7 +220,7 @@ describe('MQTT Nodes', function () {
|
||||
topic: nextTopic(),
|
||||
payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF]) //non valid UTF8
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, {payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF])});
|
||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send/receive all v5 flags and settings', function (done) {
|
||||
@@ -168,16 +230,16 @@ describe('MQTT Nodes', function () {
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: t + "/command", payload: Buffer.from("v5"), qos: 1, retain: true,
|
||||
topic: t + "/command", payload: Buffer.from('{"version":"v5"}'), qos: 1, retain: true,
|
||||
responseTopic: t + "/response",
|
||||
userProperties: { prop1: "val1" },
|
||||
contentType: "application/json",
|
||||
contentType: "text/plain",
|
||||
correlationData: Buffer.from([1, 2, 3]),
|
||||
payloadFormatIndicator: true,
|
||||
messageExpiryInterval: 2000,
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
options.expectMsg.payload = options.expectMsg.payload.toString(); //auto mode + payloadFormatIndicator should make a string
|
||||
options.expectMsg.payload = options.expectMsg.payload.toString(); //auto mode + payloadFormatIndicator + contentType: "text/plain" should make a string
|
||||
delete options.expectMsg.payloadFormatIndicator; //Seems mqtt.js only publishes payloadFormatIndicator the will msg
|
||||
const inOptions = {
|
||||
datatype: "auto", topicType: "static",
|
||||
@@ -185,6 +247,109 @@ describe('MQTT Nodes', function () {
|
||||
}
|
||||
testSendRecv({ protocolVersion: 5 }, inOptions, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send regular string with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: "abc", contentType: "text/plain"
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send JSON with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: '{"prop":"val"}', contentType: "text/plain"
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send JSON with v5 media type "text/plain" and receive a string (auto-detect mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: '{"prop":"val"}', contentType: "text/plain"
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send JSON with v5 media type "application/json" and receive an object (auto-detect mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: '{"prop":"val"}', contentType: "application/json",
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: JSON.parse(options.sendMsg.payload)});
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send invalid JSON with v5 media type "application/json" and raise an error (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(),
|
||||
payload: '{prop:"value3", "num":3}', contentType: "application/json", // send invalid JSON ...
|
||||
}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
msg.should.have.a.property("error").type("object");
|
||||
msg.error.should.have.a.property("source").type("object");
|
||||
msg.error.source.should.have.a.property("id", mqttIn.id);
|
||||
done();
|
||||
} catch (err) {
|
||||
done(err)
|
||||
}
|
||||
});
|
||||
return true; //handled
|
||||
}
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
|
||||
itConditional('should send buffer with v5 media type "application/json" and receive an object (auto-detect mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "application/json",
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: {"prop":"val"}});
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send buffer with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "text/plain",
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: '{"prop":"val"}'});
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
itConditional('should send buffer with v5 media type "application/zip" and receive a buffer (auto mode)', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
const options = {}
|
||||
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||
options.sendMsg = {
|
||||
topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "application/zip",
|
||||
}
|
||||
options.expectMsg = Object.assign({}, options.sendMsg, { payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d])});
|
||||
testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||
});
|
||||
|
||||
itConditional('should subscribe dynamically via action', function (done) {
|
||||
if (skipTests) { return this.skip() }
|
||||
this.timeout = 2000;
|
||||
@@ -278,7 +443,8 @@ describe('MQTT Nodes', function () {
|
||||
const mqttOut = helper.getNode("mqtt.out");
|
||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2])
|
||||
.then(() => {
|
||||
//connected - add the on handler and call to disconnect
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
@@ -292,6 +458,7 @@ describe('MQTT Nodes', function () {
|
||||
})
|
||||
mqttOut.receive({ "action": "disconnect" });//close broker2
|
||||
})
|
||||
.catch(done);
|
||||
});
|
||||
});
|
||||
itConditional('should publish will message', function (done) {
|
||||
@@ -307,7 +474,8 @@ describe('MQTT Nodes', function () {
|
||||
const helperNode = helper.getNode("helper.node");
|
||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2])
|
||||
.then(() => {
|
||||
//connected - add the on handler and call to disconnect
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
@@ -321,6 +489,7 @@ describe('MQTT Nodes', function () {
|
||||
});
|
||||
mqttBroker2.client.end(true); //force closure
|
||||
})
|
||||
.catch(done);
|
||||
});
|
||||
});
|
||||
itConditional('should publish will message with V5 properties', function (done) {
|
||||
@@ -362,7 +531,8 @@ describe('MQTT Nodes', function () {
|
||||
const helperNode = helper.getNode("helper.node");
|
||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||
waitBrokerConnect([mqttBroker1, mqttBroker2])
|
||||
.then(() => {
|
||||
//connected - add the on handler and call to disconnect
|
||||
helperNode.on("input", function (msg) {
|
||||
try {
|
||||
@@ -374,6 +544,7 @@ describe('MQTT Nodes', function () {
|
||||
});
|
||||
mqttBroker2.client.end(true); //force closure
|
||||
})
|
||||
.catch(done);
|
||||
});
|
||||
});
|
||||
//#endregion ADVANCED TESTS
|
||||
@@ -431,17 +602,24 @@ function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hoo
|
||||
}
|
||||
});
|
||||
}
|
||||
waitBrokerConnect(mqttBroker, function () {
|
||||
waitBrokerConnect(mqttBroker)
|
||||
.then(() => {
|
||||
//finally, connected!
|
||||
if (hooks.afterConnect) {
|
||||
let handled = hooks.afterConnect(helperNode, mqttBroker, mqttIn, mqttOut);
|
||||
if (handled) { return }
|
||||
}
|
||||
if (mqttIn.isDynamic) {
|
||||
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic })
|
||||
if(sendMsg.topic) {
|
||||
if (mqttIn.isDynamic) {
|
||||
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic })
|
||||
}
|
||||
mqttOut.receive(sendMsg);
|
||||
}
|
||||
mqttOut.receive(sendMsg);
|
||||
})
|
||||
.catch((e) => {
|
||||
if (hooks.done) { hooks.done(e); }
|
||||
else { throw e; }
|
||||
});
|
||||
} catch (err) {
|
||||
if (hooks.done) { hooks.done(err); }
|
||||
else { throw err; }
|
||||
@@ -463,14 +641,16 @@ function buildBasicMQTTSendRecvFlow(brokerOptions, inOptions, outOptions) {
|
||||
const inNode = buildMQTTInNode(inOptions.id, inOptions.name, inOptions.broker || broker.id, inOptions.topic, inOptions, ["helper.node"]);
|
||||
const outNode = buildMQTTOutNode(outOptions.id, outOptions.name, outOptions.broker || broker.id, outOptions.topic, outOptions);
|
||||
const helper = buildNode("helper", "helper.node", "helper_node", {});
|
||||
const catchNode = buildNode("catch", "catch.node", "catch_node", {"scope": ["mqtt.in"]}, ["helper.node"]);
|
||||
return {
|
||||
nodes: {
|
||||
[broker.name]: broker,
|
||||
[inNode.name]: inNode,
|
||||
[outNode.name]: outNode,
|
||||
[helper.name]: helper,
|
||||
[catchNode.name]: catchNode,
|
||||
},
|
||||
flow: [broker, inNode, outNode, helper]
|
||||
flow: [broker, inNode, outNode, helper, catchNode]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -575,22 +755,33 @@ function compareMsgToExpected(msg, expectMsg) {
|
||||
if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); }
|
||||
}
|
||||
|
||||
function waitBrokerConnect(broker, callback, timeLimit) {
|
||||
timeLimit = timeLimit || 2000;
|
||||
const brokers = Array.isArray(broker) ? broker : [broker];
|
||||
wait();
|
||||
function wait() {
|
||||
if (brokers.every(e => e.connected == true)) {
|
||||
callback(); //yey - connected!
|
||||
} else {
|
||||
timeLimit = timeLimit - 15;
|
||||
if (timeLimit <= 0) {
|
||||
throw new Error("Timeout waiting broker connect")
|
||||
function waitBrokerConnect(broker, timeLimit) {
|
||||
|
||||
let waitConnected = (broker, timeLimit) => {
|
||||
const brokers = Array.isArray(broker) ? broker : [broker];
|
||||
timeLimit = timeLimit || 1000;
|
||||
let timer, resolved = false;
|
||||
return new Promise( (resolve, reject) => {
|
||||
timer = wait();
|
||||
function wait() {
|
||||
if (brokers.every(e => e.connected == true)) {
|
||||
resolved = true;
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
} else {
|
||||
timeLimit = timeLimit - 15;
|
||||
if (timeLimit <= 0) {
|
||||
if(!resolved) {
|
||||
reject("Timeout waiting broker connect")
|
||||
}
|
||||
}
|
||||
timer = setTimeout(wait, 15);
|
||||
return timer;
|
||||
}
|
||||
}
|
||||
setTimeout(wait, 15);
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
return waitConnected(broker, timeLimit);
|
||||
}
|
||||
|
||||
function hasProperty(obj, propName) {
|
||||
|
||||
Reference in New Issue
Block a user