mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
tidy up code
This commit is contained in:
parent
497d63e67e
commit
a6696733fa
@ -16,11 +16,12 @@
|
|||||||
|
|
||||||
/* These tests are only supposed to be executed at development time (for now)*/
|
/* These tests are only supposed to be executed at development time (for now)*/
|
||||||
|
|
||||||
|
"use strict";
|
||||||
const should = require("should");
|
const should = require("should");
|
||||||
const helper = require("node-red-node-test-helper");
|
const helper = require("node-red-node-test-helper");
|
||||||
const mqttNodes = require("nr-test-utils").require("@node-red/nodes/core/network/10-mqtt.js");
|
const mqttNodes = require("nr-test-utils").require("@node-red/nodes/core/network/10-mqtt.js");
|
||||||
const server = process.env.MQTT_BROKER_SERVER || "localhost";
|
const BROKER_HOST = process.env.MQTT_BROKER_SERVER || "localhost"; //"broker.emqx.io";// "mqtt.eclipse.org"; //"test.mosquitto.org"; //"localhost";
|
||||||
const port = process.env.MQTT_BROKER_PORT || 1883;
|
const BROKER_PORT = process.env.MQTT_BROKER_PORT || 1883;
|
||||||
const skipTests = process.env.MQTT_SKIP_TESTS == "true";
|
const skipTests = process.env.MQTT_SKIP_TESTS == "true";
|
||||||
|
|
||||||
describe('MQTT Nodes', function () {
|
describe('MQTT Nodes', function () {
|
||||||
@ -36,40 +37,34 @@ describe('MQTT Nodes', function () {
|
|||||||
afterEach(function () {
|
afterEach(function () {
|
||||||
try {
|
try {
|
||||||
helper.unload();
|
helper.unload();
|
||||||
} catch (error) {
|
} catch (error) { }
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should be loaded and have default values', function (done) {
|
it('should be loaded and have default values', function (done) {
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const brokerName = "mqtt_broker", brokerId = "mqtt.broker", brokerOptions = { autoConnect: false };
|
const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false }, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" });
|
||||||
const inName = "mqtt_in", inId = "mqtt.in";
|
|
||||||
const outName = "mqtt_out", outId = "mqtt.out";
|
|
||||||
const flow = [
|
|
||||||
buildMQTTInNode(inId, inName, brokerId, "test/in", {}, [outId]),
|
|
||||||
buildMQTTOutNode(outId, outName, brokerId, "test/out", {}),
|
|
||||||
buildMQTTBrokerNode(brokerId, brokerName, server, port, brokerOptions),
|
|
||||||
];
|
|
||||||
helper.load(mqttNodes, flow, function () {
|
helper.load(mqttNodes, flow, function () {
|
||||||
try {
|
try {
|
||||||
const mqttIn = helper.getNode(inId);
|
const mqttIn = helper.getNode("mqtt.in");
|
||||||
const mqttOut = helper.getNode(outId);
|
const mqttOut = helper.getNode("mqtt.out");
|
||||||
const mqttBroker = helper.getNode(brokerId);
|
const mqttBroker = helper.getNode("mqtt.broker");
|
||||||
|
|
||||||
should(mqttIn).be.type("object", "mqtt in node should be an object")
|
should(mqttIn).be.type("object", "mqtt in node should be an object")
|
||||||
mqttIn.should.have.property('broker', brokerId);
|
mqttIn.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
|
||||||
mqttIn.should.have.property('datatype', 'utf8'); //default: 'utf8'
|
mqttIn.should.have.property('datatype', 'utf8'); //default: 'utf8'
|
||||||
mqttIn.should.have.property('isDynamic', false); //default: false
|
mqttIn.should.have.property('isDynamic', false); //default: false
|
||||||
mqttIn.should.have.property('inputs', 0); //default: 0
|
mqttIn.should.have.property('inputs', 0); //default: 0
|
||||||
mqttIn.should.have.property('qos', 2); //default: 2
|
mqttIn.should.have.property('qos', 2); //default: 2
|
||||||
mqttIn.should.have.property('wires', [[outId]]);
|
mqttIn.should.have.property('topic', "in_topic");
|
||||||
|
mqttIn.should.have.property('wires', [["helper.node"]]);
|
||||||
|
|
||||||
should(mqttOut).be.type("object", "mqtt out node should be an object")
|
should(mqttOut).be.type("object", "mqtt out node should be an object")
|
||||||
mqttOut.should.have.property('broker', brokerId);
|
mqttOut.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
|
||||||
|
mqttOut.should.have.property('topic', "out_topic");
|
||||||
|
|
||||||
should(mqttBroker).be.type("object", "mqtt broker node should be an object")
|
should(mqttBroker).be.type("object", "mqtt broker node should be an object")
|
||||||
mqttBroker.should.have.property('broker', server);
|
mqttBroker.should.have.property('broker', BROKER_HOST);
|
||||||
mqttBroker.should.have.property('port', port);
|
mqttBroker.should.have.property('port', BROKER_PORT);
|
||||||
mqttBroker.should.have.property('brokerurl');
|
mqttBroker.should.have.property('brokerurl');
|
||||||
// mqttBroker.should.have.property('autoUnsubscribe', true);//default: true
|
// mqttBroker.should.have.property('autoUnsubscribe', true);//default: true
|
||||||
mqttBroker.should.have.property('autoConnect', false);//Set "autoConnect:false" in brokerOptions
|
mqttBroker.should.have.property('autoConnect', false);//Set "autoConnect:false" in brokerOptions
|
||||||
@ -98,88 +93,96 @@ describe('MQTT Nodes', function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//#region ################### BASIC TESTS ################### #//
|
//#region ################### BASIC TESTS ################### #//
|
||||||
itConditional('should send and receive string (auto)', function (done) {
|
|
||||||
|
itConditional('basic send and receive tests', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: "hello",
|
payload: "hello",
|
||||||
qos: 0
|
qos: 0
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
});
|
||||||
itConditional('should send JSON and receive string (auto)', function (done) {
|
itConditional('should send JSON and receive string (auto)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: '{"prop":"value1", "num":1}',
|
payload: '{"prop":"value1", "num":1}',
|
||||||
qos: 1
|
qos: 1
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
})
|
||||||
itConditional('should send JSON and receive string (utf8)', function (done) {
|
itConditional('should send JSON and receive string (utf8)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: '{"prop":"value2", "num":2}',
|
payload: '{"prop":"value2", "num":2}',
|
||||||
qos: 2
|
qos: 2
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
testSendRecv({}, { datatype: "utf8", topicType: "static" }, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "utf8", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
});
|
||||||
itConditional('should send JSON and receive Object (json)', function (done) {
|
itConditional('should send JSON and receive Object (json)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: '{"prop":"value3", "num":3}'// send a string ...
|
payload: '{"prop":"value3", "num":3}'// send a string ...
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg, {payload: {"prop":"value3", "num":3}});//expect an object
|
options.expectMsg = Object.assign({}, options.sendMsg, { payload: { "prop": "value3", "num": 3 } });//expect an object
|
||||||
testSendRecv({}, { datatype: "json", topicType: "static" }, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "json", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
});
|
||||||
itConditional('should send String and receive Buffer (buffer)', function (done) {
|
itConditional('should send String and receive Buffer (buffer)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: "a b c" //send string ...
|
payload: "a b c" //send string ...
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg, {payload: Buffer.from(msg.payload)});//expect Buffer.from(msg.payload)
|
options.expectMsg = Object.assign({}, options.sendMsg, { payload: Buffer.from(options.sendMsg.payload) });//expect Buffer.from(msg.payload)
|
||||||
testSendRecv({}, { datatype: "buffer", topicType: "static"}, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "buffer", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
});
|
||||||
itConditional('should send utf8 Buffer and receive String (auto)', function (done) {
|
itConditional('should send utf8 Buffer and receive String (auto)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: Buffer.from([0x78, 0x20, 0x79, 0x20, 0x7a]) // "x y z"
|
payload: Buffer.from([0x78, 0x20, 0x79, 0x20, 0x7a]) // "x y z"
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg, {payload: "x y z"});//set expected payload to "x y z"
|
options.expectMsg = Object.assign({}, options.sendMsg, { payload: "x y z" });//set expected payload to "x y z"
|
||||||
testSendRecv({}, { datatype: "auto", topicType: "static"}, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
|
||||||
});
|
});
|
||||||
itConditional('should send non utf8 Buffer and receive Buffer (auto)', function (done) {
|
itConditional('should send non utf8 Buffer and receive Buffer (auto)', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
|
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||||
|
options.sendMsg = {
|
||||||
topic: nextTopic(),
|
topic: nextTopic(),
|
||||||
payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF]) //non valid UTF8
|
payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF]) //non valid UTF8
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, msg, expectMsg, done);
|
testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, hooks);
|
||||||
});
|
});
|
||||||
itConditional('should send/receive all v5 flags and settings', function (done) {
|
itConditional('should send/receive all v5 flags and settings', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const t = nextTopic();
|
const t = nextTopic();
|
||||||
const msg = {
|
const options = {}
|
||||||
topic: t + "/command",
|
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||||
payload: Buffer.from("v5"),
|
options.sendMsg = {
|
||||||
qos: 1,
|
topic: t + "/command", payload: Buffer.from("v5"), qos: 1, retain: true,
|
||||||
retain: true,
|
|
||||||
responseTopic: t + "/response",
|
responseTopic: t + "/response",
|
||||||
userProperties: { prop1: "val1" },
|
userProperties: { prop1: "val1" },
|
||||||
contentType: "application/json",
|
contentType: "application/json",
|
||||||
@ -187,95 +190,79 @@ describe('MQTT Nodes', function () {
|
|||||||
payloadFormatIndicator: true,
|
payloadFormatIndicator: true,
|
||||||
messageExpiryInterval: 2000,
|
messageExpiryInterval: 2000,
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
expectMsg.payload = expectMsg.payload.toString(); //auto mode + payloadFormatIndicator should make a string
|
options.expectMsg.payload = options.expectMsg.payload.toString(); //auto mode + payloadFormatIndicator should make a string
|
||||||
delete expectMsg.payloadFormatIndicator; //Seems mqtt.js only publishes payloadFormatIndicator the will msg
|
delete options.expectMsg.payloadFormatIndicator; //Seems mqtt.js only publishes payloadFormatIndicator the will msg
|
||||||
const inOptions = {
|
const inOptions = {
|
||||||
datatype: "auto", topicType: "static",
|
datatype: "auto", topicType: "static",
|
||||||
qos: 1, nl: false, rap: true, rh: 1,
|
qos: 1, nl: false, rap: true, rh: 1
|
||||||
subscriptionIdentifier: 333
|
|
||||||
}
|
}
|
||||||
testSendRecv({ protocolVersion: 5 }, inOptions, {}, msg, expectMsg, done);
|
testSendRecv({ protocolVersion: 5 }, inOptions, {}, options, hooks);
|
||||||
});
|
});
|
||||||
itConditional('should subscribe dynamically via action', function (done) {
|
itConditional('should subscribe dynamically via action', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const msg = {
|
const options = {}
|
||||||
topic: nextTopic(),
|
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||||
payload: "abc"
|
options.sendMsg = {
|
||||||
|
topic: nextTopic(), payload: "abc"
|
||||||
}
|
}
|
||||||
const expectMsg = Object.assign({}, msg);
|
options.expectMsg = Object.assign({}, options.sendMsg);
|
||||||
testSendRecv({ protocolVersion: 5 }, { datatype: "utf8", topicType: "dynamic" }, {}, msg, expectMsg, done);
|
testSendRecv({ protocolVersion: 5 }, { datatype: "utf8", topicType: "dynamic" }, {}, options, hooks);
|
||||||
});
|
});
|
||||||
//#endregion BASIC TESTS
|
//#endregion BASIC TESTS
|
||||||
|
|
||||||
//#region ################### ADVANCED TESTS ################### #//
|
//#region ################### ADVANCED TESTS ################### #//
|
||||||
itConditional('should connect via "connect" action', function (done) {
|
itConditional('should connect via "connect" action', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const {flow, nodes} = buildBasicMQTTSendRecvFlow("mqtt.broker", "mqtt_broker", { autoConnect: false }, "mqtt.in", "mqtt_in", {}, null, null, null, {});
|
const options = {}
|
||||||
flow.push({ "id": "status_node", "type": "status", "name": "status-node", "scope": ["mqtt.in"], "wires": [["helper.node"]] });
|
const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||||
|
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||||
helper.load(mqttNodes, flow, function () {
|
mqttBroker.should.have.property("autoConnect", false);
|
||||||
const helperNode = helper.getNode("helper.node");
|
mqttBroker.should.have.property("connecting", false);//should not attempt to connect (autoConnect:false)
|
||||||
const mqttIn = helper.getNode("mqtt.in");
|
mqttIn.receive({ "action": "connect" }); //now request connect action
|
||||||
const mqttBroker = helper.getNode("mqtt.broker");
|
return true; //handled
|
||||||
try {
|
}
|
||||||
mqttBroker.should.have.property("autoConnect", false);
|
hooks.afterConnect = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||||
mqttBroker.should.have.property("connecting", false);//should not attempt to connect (autoConnect:false)
|
done();//if we got here, it connected :)
|
||||||
} catch (error) {
|
return true;
|
||||||
done(error)
|
}
|
||||||
}
|
testSendRecv({ protocolVersion: 5, autoConnect: false }, { datatype: "utf8", topicType: "dynamic" }, {}, options, hooks);
|
||||||
mqttIn.receive({ "action": "connect" });
|
|
||||||
waitConnection();
|
|
||||||
function waitConnection() {
|
|
||||||
if (!mqttBroker.connected) {
|
|
||||||
setTimeout(waitConnection, 15);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
done();//if we got here, it connected!
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
itConditional('should disconnect via "disconnect" action', function (done) {
|
itConditional('should disconnect via "disconnect" action', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const {flow, nodes} = buildBasicMQTTSendRecvFlow("mqtt.broker", "mqtt_broker", {}, null, null, null, "mqtt.out", "mqtt_out", {}, {});
|
const options = {}
|
||||||
flow.push({ "id": "statusnode", "type": "status", "name": "status-node", "scope": ["mqtt.out"], "wires": [["helper.node"]] });//add status node to watch mqtt_out
|
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
|
||||||
|
hooks.beforeLoad = (flow) => { //add a status node pointed at MQTT Out node (to watch for connection status change)
|
||||||
helper.load(mqttNodes, flow, function () {
|
flow.push({ "id": "status.node", "type": "status", "name": "status_node", "scope": ["mqtt.out"], "wires": [["helper.node"]] });//add status node to watch mqtt_out
|
||||||
const helperNode = helper.getNode("helper.node");
|
}
|
||||||
const mqttOut = helper.getNode("mqtt.out");
|
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||||
const mqttBroker = helper.getNode("mqtt.broker");
|
mqttBroker.should.have.property("autoConnect", true);
|
||||||
try {
|
mqttBroker.should.have.property("connecting", true);//should be trying to connect (autoConnect:true)
|
||||||
mqttBroker.should.have.property("autoConnect", true);
|
return true; //handled
|
||||||
mqttBroker.should.have.property("connecting", true);//should be trying to connect (autoConnect:true)
|
}
|
||||||
} catch (error) {
|
hooks.afterConnect = (helperNode, mqttBroker, mqttIn, mqttOut) => {
|
||||||
done(error)
|
//connected - now add the "on" handler then send "disconnect" action
|
||||||
}
|
helperNode.on("input", function (msg) {
|
||||||
waitConnection();
|
try {
|
||||||
function waitConnection() {
|
msg.should.have.property("status");
|
||||||
if (!mqttBroker.connected) {
|
msg.status.should.have.property("text");
|
||||||
setTimeout(waitConnection, 15);
|
msg.status.text.should.containEql('disconnect');
|
||||||
return;
|
done(); //it disconnected - yey!
|
||||||
|
} catch (error) {
|
||||||
|
done(error)
|
||||||
}
|
}
|
||||||
//connected - add the on handler and call to disconnect
|
})
|
||||||
helperNode.on("input", function (msg) {
|
mqttOut.receive({ "action": "disconnect" });
|
||||||
try {
|
return true; //handed
|
||||||
msg.status.should.have.property("text");
|
}
|
||||||
msg.status.text.should.containEql('disconnect');
|
testSendRecv({ protocolVersion: 5 }, null, {}, options, hooks);
|
||||||
done();
|
|
||||||
} catch (error) {
|
|
||||||
done(error)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
mqttOut.receive({ "action": "disconnect" });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
itConditional('should publish birth message', function (done) {
|
itConditional('should publish birth message', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const baseTopic = nextTopic();
|
const baseTopic = nextTopic();
|
||||||
const brokerOptions = {
|
const brokerOptions = {
|
||||||
protocolVersion: 4,
|
protocolVersion: 4,
|
||||||
@ -283,107 +270,80 @@ describe('MQTT Nodes', function () {
|
|||||||
birthPayload: "broker connected",
|
birthPayload: "broker connected",
|
||||||
birthQos: 2,
|
birthQos: 2,
|
||||||
}
|
}
|
||||||
const msg = { topic: baseTopic + "/ignoreme"}
|
const options = {};
|
||||||
const expectMsg ={
|
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null };
|
||||||
|
options.expectMsg = {
|
||||||
topic: brokerOptions.birthTopic,
|
topic: brokerOptions.birthTopic,
|
||||||
payload: brokerOptions.birthPayload,
|
payload: brokerOptions.birthPayload,
|
||||||
qos: brokerOptions.birthQos
|
qos: brokerOptions.birthQos
|
||||||
};
|
};
|
||||||
testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, msg, expectMsg, done);
|
testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, options, hooks);
|
||||||
});
|
});
|
||||||
itConditional('should publish close message', function (done) {
|
itConditional('should publish close message', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const baseTopic = nextTopic();
|
const baseTopic = nextTopic();
|
||||||
const broker1Options = { id: "mqtt.broker1", name: "mqtt_broker1" }
|
const broker1Options = { id: "mqtt.broker1" }//Broker 1 - stays connected to receive the close message
|
||||||
const broker2Options = { id: "mqtt.broker2", name: "mqtt_broker2", closeTopic: baseTopic + "/close", closePayload: "broker disconnected", closeQos: 2,}
|
const broker2Options = { id: "mqtt.broker2", closeTopic: baseTopic + "/close", closePayload: '{"msg":"close"}', closeQos: 1, }//Broker 2 - connects to same broker but has a LWT message.
|
||||||
|
const { flow } = buildBasicMQTTSendRecvFlow(broker1Options, { broker: broker1Options.id, topic: broker2Options.closeTopic, datatype: "json" }, { broker: broker2Options.id })
|
||||||
const {flow} = buildBasicMQTTSendRecvFlow(
|
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, BROKER_HOST, BROKER_PORT, broker2Options)); //add second broker
|
||||||
broker1Options.id, broker1Options.name || "mqtt_broker", broker1Options,
|
|
||||||
"mqtt.in", "mqtt_in", {broker: broker1Options.id, topic: broker2Options.closeTopic}, //should receive close msg of broker2
|
|
||||||
"mqtt.out", "mqtt_out", {broker: broker2Options.id},
|
|
||||||
)
|
|
||||||
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, server, port, broker2Options))
|
|
||||||
|
|
||||||
helper.load(mqttNodes, flow, function () {
|
helper.load(mqttNodes, flow, function () {
|
||||||
const helperNode = helper.getNode("helper.node");
|
const helperNode = helper.getNode("helper.node");
|
||||||
const mqttOut = helper.getNode("mqtt.out");
|
const mqttOut = helper.getNode("mqtt.out");
|
||||||
const mqttIn = helper.getNode("mqtt.in");
|
|
||||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||||
waitConnection();
|
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||||
function waitConnection() {
|
|
||||||
if (!mqttBroker1.connected || !mqttBroker2.connected) {
|
|
||||||
setTimeout(waitConnection, 15);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//connected - add the on handler and call to disconnect
|
//connected - add the on handler and call to disconnect
|
||||||
helperNode.on("input", function (msg) {
|
helperNode.on("input", function (msg) {
|
||||||
try {
|
try {
|
||||||
msg.should.have.property("topic", broker2Options.closeTopic);
|
msg.should.have.property("topic", broker2Options.closeTopic);
|
||||||
msg.should.have.property('payload', broker2Options.closePayload);
|
msg.should.have.property('payload', JSON.parse(broker2Options.closePayload));
|
||||||
msg.should.have.property('qos', broker2Options.closeQos);
|
msg.should.have.property('qos', broker2Options.closeQos);
|
||||||
done();
|
done();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
done(error)
|
done(error)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
mqttOut.receive({ "action": "disconnect" });
|
mqttOut.receive({ "action": "disconnect" });//close broker2
|
||||||
}
|
})
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
itConditional('should publish will message', function (done) {
|
itConditional('should publish will message', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const baseTopic = nextTopic();
|
const baseTopic = nextTopic();
|
||||||
//Broker 1 - stays connected to receive the will message when broker 2 is killed
|
const broker1Options = { id: "mqtt.broker1" }//Broker 1 - stays connected to receive the will message
|
||||||
const broker1Options = { id: "mqtt.broker1", name: "mqtt_broker1" }
|
const broker2Options = { id: "mqtt.broker2", willTopic: baseTopic + "/will", willPayload: '{"msg":"will"}', willQos: 2, }//Broker 2 - connects to same broker but has a LWT message.
|
||||||
//Broker 2 - connects to same broker but has a LWT message. Broker 2 gets killed shortly after connection so that the will message is sent from broker
|
const { flow } = buildBasicMQTTSendRecvFlow(broker1Options, { broker: broker1Options.id, topic: broker2Options.willTopic, datatype: "utf8" }, { broker: broker2Options.id })
|
||||||
const broker2Options = { id: "mqtt.broker2", name: "mqtt_broker2", willTopic: baseTopic + "/will", willPayload: '{"msg":"will"}', willQos: 1,}
|
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, BROKER_HOST, BROKER_PORT, broker2Options)); //add second broker
|
||||||
const expectMsg ={
|
|
||||||
topic: broker2Options.willTopic,
|
|
||||||
payload: JSON.parse(broker2Options.willPayload),
|
|
||||||
qos: broker2Options.willQos
|
|
||||||
};
|
|
||||||
|
|
||||||
const {flow} = buildBasicMQTTSendRecvFlow(
|
|
||||||
broker1Options.id, broker1Options.name || "mqtt_broker", broker1Options,
|
|
||||||
"mqtt.in", "mqtt_in", {broker: broker1Options.id, topic: broker2Options.willTopic, datatype: "json"}, //should receive will msg of broker2
|
|
||||||
"mqtt.out", "mqtt_out", {broker: broker2Options.id},
|
|
||||||
)
|
|
||||||
//add second broker
|
|
||||||
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, server, port, broker2Options))
|
|
||||||
|
|
||||||
helper.load(mqttNodes, flow, function () {
|
helper.load(mqttNodes, flow, function () {
|
||||||
const helperNode = helper.getNode("helper.node");
|
const helperNode = helper.getNode("helper.node");
|
||||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||||
waitConnection();
|
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||||
function waitConnection() {
|
|
||||||
if (!mqttBroker1.connected || !mqttBroker2.connected) {
|
|
||||||
setTimeout(waitConnection, 15);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//connected - add the on handler and call to disconnect
|
//connected - add the on handler and call to disconnect
|
||||||
helperNode.on("input", function (msg) {
|
helperNode.on("input", function (msg) {
|
||||||
try {
|
try {
|
||||||
compareMsgToExpected(msg, expectMsg);
|
msg.should.have.property("topic", broker2Options.willTopic);
|
||||||
|
msg.should.have.property('payload', broker2Options.willPayload);
|
||||||
|
msg.should.have.property('qos', broker2Options.willQos);
|
||||||
done();
|
done();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
done(error)
|
done(error)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
mqttBroker2.client.end(true); //force closure
|
mqttBroker2.client.end(true); //force closure
|
||||||
}
|
})
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
itConditional('should publish will message with V5 properties', function (done) {
|
itConditional('should publish will message with V5 properties', function (done) {
|
||||||
if (skipTests) { return this.skip() }
|
if (skipTests) { return this.skip() }
|
||||||
// return this.skip(); //Issue receiving v5 props on will msg. Issue raised here: https://github.com/mqttjs/MQTT.js/issues/1455
|
// return this.skip(); //Issue receiving v5 props on will msg. Issue raised here: https://github.com/mqttjs/MQTT.js/issues/1455
|
||||||
this.timeout = 1000;
|
this.timeout = 2000;
|
||||||
const baseTopic = nextTopic();
|
const baseTopic = nextTopic();
|
||||||
//Broker 1 - stays connected to receive the will message when broker 2 is killed
|
//Broker 1 - stays connected to receive the will message when broker 2 is killed
|
||||||
const broker1Options = { id: "mqtt.broker1", name: "mqtt_broker1", protocolVersion: 5, datatype: "utf8"}
|
const broker1Options = { id: "mqtt.broker1", name: "mqtt_broker1", protocolVersion: 5, datatype: "utf8" }
|
||||||
//Broker 2 - connects to same broker but has a LWT message. Broker 2 gets killed shortly after connection so that the will message is sent from broker
|
//Broker 2 - connects to same broker but has a LWT message. Broker 2 gets killed shortly after connection so that the will message is sent from broker
|
||||||
const broker2Options = {
|
const broker2Options = {
|
||||||
id: "mqtt.broker2", name: "mqtt_broker2", protocolVersion: 5,
|
id: "mqtt.broker2", name: "mqtt_broker2", protocolVersion: 5,
|
||||||
@ -391,15 +351,15 @@ describe('MQTT Nodes', function () {
|
|||||||
willPayload: '{"msg":"will"}',
|
willPayload: '{"msg":"will"}',
|
||||||
willQos: 2,
|
willQos: 2,
|
||||||
willMsg: {
|
willMsg: {
|
||||||
contentType: 'application/json' ,
|
contentType: 'application/json',
|
||||||
userProps: {"will":"value"},
|
userProps: { "will": "value" },
|
||||||
respTopic: baseTopic+"/resp",
|
respTopic: baseTopic + "/resp",
|
||||||
correl: Buffer.from("abc"),
|
correl: Buffer.from("abc"),
|
||||||
expiry: 2000,
|
expiry: 2000,
|
||||||
payloadFormatIndicator: true
|
payloadFormatIndicator: true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const expectMsg ={
|
const expectMsg = {
|
||||||
topic: broker2Options.willTopic,
|
topic: broker2Options.willTopic,
|
||||||
payload: broker2Options.willPayload,
|
payload: broker2Options.willPayload,
|
||||||
qos: broker2Options.willQos,
|
qos: broker2Options.willQos,
|
||||||
@ -410,24 +370,13 @@ describe('MQTT Nodes', function () {
|
|||||||
messageExpiryInterval: broker2Options.willMsg.expiry,
|
messageExpiryInterval: broker2Options.willMsg.expiry,
|
||||||
// payloadFormatIndicator: broker2Options.willMsg.payloadFormatIndicator,
|
// payloadFormatIndicator: broker2Options.willMsg.payloadFormatIndicator,
|
||||||
};
|
};
|
||||||
const {flow} = buildBasicMQTTSendRecvFlow(
|
const { flow, nodes } = buildBasicMQTTSendRecvFlow(broker1Options, { broker: broker1Options.id, topic: broker2Options.willTopic, datatype: "utf8" }, { broker: broker2Options.id })
|
||||||
broker1Options.id, broker1Options.name || "mqtt_broker", broker1Options,
|
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, nodes.mqtt_broker1.broker, nodes.mqtt_broker1.port, broker2Options)) //add second broker with will msg set
|
||||||
"mqtt.in", "mqtt_in", {broker: broker1Options.id, topic: broker2Options.willTopic}, //should receive will msg of broker2
|
|
||||||
"mqtt.out", "mqtt_out", {broker: broker2Options.id},
|
|
||||||
)
|
|
||||||
//add second broker with will msg set
|
|
||||||
flow.push(buildMQTTBrokerNode(broker2Options.id, broker2Options.name, server, port, broker2Options))
|
|
||||||
|
|
||||||
helper.load(mqttNodes, flow, function () {
|
helper.load(mqttNodes, flow, function () {
|
||||||
const helperNode = helper.getNode("helper.node");
|
const helperNode = helper.getNode("helper.node");
|
||||||
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
const mqttBroker1 = helper.getNode("mqtt.broker1");
|
||||||
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
const mqttBroker2 = helper.getNode("mqtt.broker2");
|
||||||
waitConnection();
|
waitBrokerConnect([mqttBroker1, mqttBroker2], function connected() {
|
||||||
function waitConnection() {
|
|
||||||
if (!mqttBroker1.connected || !mqttBroker2.connected) {
|
|
||||||
setTimeout(waitConnection, 15);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//connected - add the on handler and call to disconnect
|
//connected - add the on handler and call to disconnect
|
||||||
helperNode.on("input", function (msg) {
|
helperNode.on("input", function (msg) {
|
||||||
try {
|
try {
|
||||||
@ -437,8 +386,8 @@ describe('MQTT Nodes', function () {
|
|||||||
done(error)
|
done(error)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
mqttBroker2.client.end(true); //force closure of broker 2 to cause will msg
|
mqttBroker2.client.end(true); //force closure
|
||||||
}
|
})
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
//#endregion ADVANCED TESTS
|
//#endregion ADVANCED TESTS
|
||||||
@ -447,144 +396,106 @@ describe('MQTT Nodes', function () {
|
|||||||
//#region ################### HELPERS ################### #//
|
//#region ################### HELPERS ################### #//
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A basic unit test that builds a flow containg 1 broker, 1 mqtt-in, one mqtt-out and a helper.
|
* A basic unit test that builds a flow containing 1 broker, 1 mqtt-in, one mqtt-out and a helper.
|
||||||
* It performs the following steps: builds flow, loads flow, waits for connection, sends `sendMsg`,
|
* It performs the following steps: builds flow, loads flow, waits for connection, sends `sendMsg`,
|
||||||
* waits for msg then compares `sendMsg` to `expectMsg`, and finally calls `done`
|
* waits for msg then compares `sendMsg` to `expectMsg`, and finally calls `done`
|
||||||
* @param {object} brokerOptions anything that can be set in an MQTTBrokerNode (e.g. id, name, url, broker, server, port, protocolVersion, ...)
|
* @param {object} brokerOptions anything that can be set in an MQTTBrokerNode (e.g. id, name, url, broker, server, port, protocolVersion, ...)
|
||||||
* @param {object} inNodeOptions anything that can be set in an MQTTInNode (e.g. id, name, broker, topic, rh, nl, rap, ... )
|
* @param {object} inNodeOptions anything that can be set in an MQTTInNode (e.g. id, name, broker, topic, rh, nl, rap, ... )
|
||||||
* @param {object} outNodeOptions anything that can be set in an MQTTOutNode (e.g. id, name, broker, ...)
|
* @param {object} outNodeOptions anything that can be set in an MQTTOutNode (e.g. id, name, broker, ...)
|
||||||
* @param {object} sendMsg the msg to send to broker
|
* @param {object} options an object for passing in test properties like `sendMsg` and `expectMsg`
|
||||||
* @param {object} expectMsg the msg to send to broker
|
* @param {object} hooks an object containing hook functions...
|
||||||
* @param {function} done the test runner `done` callback
|
* * [fn] `done()` - the tests done function. If excluded, an error will be thrown upon test error
|
||||||
|
* * [fn] `beforeLoad(flow)` - provides opportunity to adjust the flow JSON before loading into runtime
|
||||||
|
* * [fn] `afterLoad(helperNode, mqttBroker, mqttIn, mqttOut)` - called before connection attempt
|
||||||
|
* * [fn] `afterConnect(helperNode, mqttBroker, mqttIn, mqttOut)` - called before connection attempt
|
||||||
*/
|
*/
|
||||||
function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, sendMsg, expectMsg, done, customTests) {
|
function testSendRecv(brokerOptions, inNodeOptions, outNodeOptions, options, hooks) {
|
||||||
sendMsg = sendMsg || {};
|
options = options || {};
|
||||||
brokerOptions = brokerOptions || {};
|
brokerOptions = brokerOptions || {};
|
||||||
inNodeOptions = inNodeOptions || {};
|
inNodeOptions = inNodeOptions || {};
|
||||||
outNodeOptions = outNodeOptions || {};
|
outNodeOptions = outNodeOptions || {};
|
||||||
brokerOptions.id = brokerOptions.id || "mqtt.broker";
|
const sendMsg = options.sendMsg || {};
|
||||||
inNodeOptions.id = inNodeOptions.id || "mqtt.in";
|
|
||||||
outNodeOptions.id = outNodeOptions.id || "mqtt.out";
|
|
||||||
inNodeOptions.brokerId = inNodeOptions.brokerId || brokerOptions.id;
|
|
||||||
outNodeOptions.id = outNodeOptions.id || brokerOptions.id;
|
|
||||||
sendMsg.topic = sendMsg.topic || nextTopic();
|
sendMsg.topic = sendMsg.topic || nextTopic();
|
||||||
|
const expectMsg = options.expectMsg || Object.assign({}, sendMsg);
|
||||||
if(inNodeOptions.topicType != "dynamic" ) {
|
expectMsg.payload = inNodeOptions.payload === undefined ? expectMsg.payload : inNodeOptions.payload;
|
||||||
|
if (inNodeOptions.topicType != "dynamic") {
|
||||||
inNodeOptions.topic = inNodeOptions.topic || sendMsg.topic;
|
inNodeOptions.topic = inNodeOptions.topic || sendMsg.topic;
|
||||||
}
|
}
|
||||||
outNodeOptions.topic = outNodeOptions.topic ? outNodeOptions.topic : sendMsg.topic;
|
|
||||||
|
|
||||||
const {flow} = buildBasicMQTTSendRecvFlow(
|
|
||||||
brokerOptions.id, brokerOptions.name || "mqtt_broker", brokerOptions,
|
|
||||||
inNodeOptions.id, inNodeOptions.name, inNodeOptions,
|
|
||||||
outNodeOptions.id, outNodeOptions.name, outNodeOptions,
|
|
||||||
)
|
|
||||||
|
|
||||||
expectMsg = expectMsg || Object.assign({}, sendMsg);
|
|
||||||
expectMsg.payload = inNodeOptions.payload === undefined ? expectMsg.payload : inNodeOptions.payload;
|
|
||||||
|
|
||||||
|
const { flow, nodes } = buildBasicMQTTSendRecvFlow(brokerOptions, inNodeOptions, outNodeOptions);
|
||||||
|
if (hooks.beforeLoad) { hooks.beforeLoad(flow) }
|
||||||
helper.load(mqttNodes, flow, function () {
|
helper.load(mqttNodes, flow, function () {
|
||||||
try {
|
try {
|
||||||
const helperNode = helper.getNode("helper.node");
|
const helperNode = helper.getNode("helper.node");
|
||||||
const mqttBroker = helper.getNode(brokerOptions.id);
|
const mqttBroker = helper.getNode(brokerOptions.id);
|
||||||
const mqttIn = helper.getNode(inNodeOptions.id);
|
const mqttIn = helper.getNode(nodes.mqtt_in.id);
|
||||||
const mqttOut = helper.getNode(outNodeOptions.id);
|
const mqttOut = helper.getNode(nodes.mqtt_out.id);
|
||||||
|
let afterLoadHandled = false;
|
||||||
helperNode.on("input", function (msg) {
|
if (hooks.afterLoad) {
|
||||||
try {
|
afterLoadHandled = hooks.afterLoad(helperNode, mqttBroker, mqttIn, mqttOut)
|
||||||
if (customTests) {
|
}
|
||||||
customTests(msg, helperNode, mqttBroker, mqttIn, mqttOut)
|
if (!afterLoadHandled) {
|
||||||
} else {
|
helperNode.on("input", function (msg) {
|
||||||
|
try {
|
||||||
compareMsgToExpected(msg, expectMsg);
|
compareMsgToExpected(msg, expectMsg);
|
||||||
|
if (hooks.done) { hooks.done(); }
|
||||||
|
} catch (err) {
|
||||||
|
if (hooks.done) { hooks.done(err); }
|
||||||
|
else { throw err; }
|
||||||
}
|
}
|
||||||
done();
|
});
|
||||||
} catch (err) {
|
}
|
||||||
done(err);
|
waitBrokerConnect(mqttBroker, function () {
|
||||||
}
|
|
||||||
});
|
|
||||||
waitConnection();
|
|
||||||
function waitConnection() {
|
|
||||||
if (!mqttBroker.connected) {
|
|
||||||
setTimeout(waitConnection, 15);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//finally, connected!
|
//finally, connected!
|
||||||
|
if (hooks.afterConnect) {
|
||||||
|
let handled = hooks.afterConnect(helperNode, mqttBroker, mqttIn, mqttOut);
|
||||||
|
if (handled) { return }
|
||||||
|
}
|
||||||
if (mqttIn.isDynamic) {
|
if (mqttIn.isDynamic) {
|
||||||
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic })
|
mqttIn.receive({ "action": "subscribe", "topic": sendMsg.topic })
|
||||||
}
|
}
|
||||||
mqttOut.receive(sendMsg);
|
mqttOut.receive(sendMsg);
|
||||||
}
|
})
|
||||||
} catch (error) {
|
} catch (err) {
|
||||||
done(error)
|
if (hooks.done) { hooks.done(err); }
|
||||||
|
else { throw err; }
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Builds a flow from an array of {type:string, id:string, name:string, options:object, wires:[string]}
|
|
||||||
* @param {[{type:string, id:string, name:string, options:object, wires:[string]}]} nodes
|
|
||||||
* @returns {{[flow: [], nodes: {}]}} Returns `{[flow: [], nodes: {}]}`
|
|
||||||
*/
|
|
||||||
function buildFlow(nodes) {
|
|
||||||
const result = {flow: [], nodes: {}};
|
|
||||||
nodes.forEach(node => {
|
|
||||||
//const flow = [ { "id": "helper.node", "type": "helper", "wires": [] } ];
|
|
||||||
node.options = node.options || {};
|
|
||||||
switch (node.type) {
|
|
||||||
case "mqtt-broker":
|
|
||||||
result.nodes[node.id] = buildMQTTBrokerNode(node.id, node.name, node.options.server, node.options.port, node.options);
|
|
||||||
break;
|
|
||||||
case "mqtt in":
|
|
||||||
result.nodes[node.id] = buildMQTTInNode(node.id, node.name, node.options.brokerId, node.options.topic, node.options);
|
|
||||||
break;
|
|
||||||
case "mqtt out":
|
|
||||||
result.nodes[node.id] = buildMQTTOutNode(node.id, node.name, node.options.brokerId, node.options.topic, node.options);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
result.nodes[node.id] = buildNode(node.type, node.id, node.name, node.options);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (node.wires && Array.isArray(node.wires)) {
|
|
||||||
result.nodes[node.id].wires[0] = [...node.wires];
|
|
||||||
}
|
|
||||||
result.flow.push(result.nodes[node.id]);
|
|
||||||
})
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a flow containing 2 parts. Part1: MQTT Out node. Part2: MQTT In node --> helper node
|
* Builds a flow containing 2 parts.
|
||||||
* If inXxx is excluded, there will be no in node
|
* * 1: MQTT Out node (with broker configured).
|
||||||
* If outXxx is excluded, there will be no out node
|
* * 2: MQTT In node (with broker configured) --> helper node `id:helper.node`
|
||||||
*/
|
*/
|
||||||
function buildBasicMQTTSendRecvFlow(brokerId, brokerName, brokerOptions, inId, inName, inOptions, outId, outName, outOptions) {
|
function buildBasicMQTTSendRecvFlow(brokerOptions, inOptions, outOptions) {
|
||||||
var nodes = [];
|
|
||||||
brokerOptions = brokerOptions || {};
|
brokerOptions = brokerOptions || {};
|
||||||
outOptions = outOptions || {};
|
brokerOptions.broker = brokerOptions.broker || BROKER_HOST;
|
||||||
inOptions = inOptions || {};
|
brokerOptions.port = brokerOptions.port || BROKER_PORT;
|
||||||
|
|
||||||
brokerOptions.server = brokerOptions.server || server;
|
|
||||||
brokerOptions.port = brokerOptions.port || port;
|
|
||||||
brokerOptions.autoConnect = String(brokerOptions.autoConnect) == "false" ? false : true;
|
brokerOptions.autoConnect = String(brokerOptions.autoConnect) == "false" ? false : true;
|
||||||
|
const broker = buildMQTTBrokerNode(brokerOptions.id, brokerOptions.name, brokerOptions.broker, brokerOptions.port, brokerOptions);
|
||||||
outOptions.broker = outOptions.broker || brokerId;
|
const inNode = buildMQTTInNode(inOptions.id, inOptions.name, inOptions.broker || broker.id, inOptions.topic, inOptions, ["helper.node"]);
|
||||||
inOptions.broker = inOptions.broker || brokerId;
|
const outNode = buildMQTTOutNode(outOptions.id, outOptions.name, outOptions.broker || broker.id, outOptions.topic, outOptions);
|
||||||
|
const helper = buildNode("helper", "helper.node", "helper_node", {});
|
||||||
nodes.push({type:"mqtt-broker", id: brokerId, name: brokerName, options: brokerOptions});
|
return {
|
||||||
if(inId) { nodes.push({type:"mqtt in", id: inId, name: inName, options: inOptions, wires: ["helper.node"]}); }
|
nodes: {
|
||||||
if(outId) { nodes.push({type:"mqtt out", id: outId, name: outName, options: outOptions}); }
|
[broker.name]: broker,
|
||||||
nodes.push({type:"helper", id: "helper.node", name: "helper_node", options: {}});
|
[inNode.name]: inNode,
|
||||||
return buildFlow(nodes);
|
[outNode.name]: outNode,
|
||||||
|
[helper.name]: helper,
|
||||||
|
},
|
||||||
|
flow: [broker, inNode, outNode, helper]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function buildMQTTBrokerNode(id, name, server, port, options) {
|
function buildMQTTBrokerNode(id, name, brokerHost, brokerPort, options) {
|
||||||
// url,broker,port,clientid,autoConnect,usetls,usews,verifyservercert,compatmode,protocolVersion,keepalive,
|
// url,broker,port,clientid,autoConnect,usetls,usews,verifyservercert,compatmode,protocolVersion,keepalive,
|
||||||
//cleansession,sessionExpiry,topicAliasMaximum,maximumPacketSize,receiveMaximum,userProperties,userPropertiesType,autoUnsubscribe
|
//cleansession,sessionExpiry,topicAliasMaximum,maximumPacketSize,receiveMaximum,userProperties,userPropertiesType,autoUnsubscribe
|
||||||
options = options || {};
|
options = options || {};
|
||||||
const node = buildNode("mqtt-broker", id, name, options);
|
const node = buildNode("mqtt-broker", id || "mqtt.broker", name || "mqtt_broker", options);
|
||||||
node.broker = server;
|
|
||||||
node.port = port;
|
|
||||||
node.url = options.url;
|
node.url = options.url;
|
||||||
|
node.broker = brokerHost || options.broker || BROKER_HOST;
|
||||||
|
node.port = brokerPort || options.port || BROKER_PORT;
|
||||||
node.clientid = options.clientid || "";
|
node.clientid = options.clientid || "";
|
||||||
node.cleansession = String(options.cleansession) == "false" ? false : true;
|
node.cleansession = String(options.cleansession) == "false" ? false : true;
|
||||||
node.autoUnsubscribe = String(options.autoUnsubscribe) == "false" ? false : true;
|
node.autoUnsubscribe = String(options.autoUnsubscribe) == "false" ? false : true;
|
||||||
@ -612,19 +523,21 @@ function buildMQTTBrokerNode(id, name, server, port, options) {
|
|||||||
function buildMQTTInNode(id, name, brokerId, topic, options, wires) {
|
function buildMQTTInNode(id, name, brokerId, topic, options, wires) {
|
||||||
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
|
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
|
||||||
options = options || {};
|
options = options || {};
|
||||||
const node = buildNode("mqtt in", id, name, options);
|
options.broker = options.broker || "mqtt.broker";
|
||||||
|
const node = buildNode("mqtt in", id || "mqtt.in", name || "mqtt_in", options);
|
||||||
node.topic = topic || "";
|
node.topic = topic || "";
|
||||||
node.broker = brokerId;
|
node.broker = brokerId;
|
||||||
node.topicType = options.topicType == "dynamic" ? "dynamic" : "static",
|
node.topicType = options.topicType == "dynamic" ? "dynamic" : "static",
|
||||||
node.inputs = options.topicType == "dynamic" ? 1 : 0,
|
node.inputs = options.topicType == "dynamic" ? 1 : 0,
|
||||||
updateNodeOptions(node, options, wires);
|
updateNodeOptions(node, options, wires);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
function buildMQTTOutNode(id, name, brokerId, topic, options) {
|
function buildMQTTOutNode(id, name, brokerId, topic, options) {
|
||||||
//{ "id": "mqtt.out", "type": "mqtt out", "name": "mqtt_out", "topic": "test/out", "qos": "", "retain": "", "respTopic": "", "contentType": "", "userProps": "", "correl": "", "expiry": "", "broker": brokerId, "wires": [] },
|
//{ "id": "mqtt.out", "type": "mqtt out", "name": "mqtt_out", "topic": "test/out", "qos": "", "retain": "", "respTopic": "", "contentType": "", "userProps": "", "correl": "", "expiry": "", "broker": brokerId, "wires": [] },
|
||||||
options = options || {};
|
options = options || {};
|
||||||
const node = buildNode("mqtt out", id, name, options);
|
options.broker = options.broker || "mqtt.broker";
|
||||||
|
const node = buildNode("mqtt out", id || "mqtt.out", name || "mqtt_out", options);
|
||||||
node.topic = topic || "";
|
node.topic = topic || "";
|
||||||
node.broker = brokerId;
|
node.broker = brokerId;
|
||||||
updateNodeOptions(node, options, null);
|
updateNodeOptions(node, options, null);
|
||||||
@ -634,13 +547,13 @@ function buildMQTTOutNode(id, name, brokerId, topic, options) {
|
|||||||
function buildNode(type, id, name, options, wires) {
|
function buildNode(type, id, name, options, wires) {
|
||||||
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
|
//{ "id": "mqtt.in", "type": "mqtt in", "name": "mqtt_in", "topic": "test/in", "qos": "2", "datatype": "auto", "broker": "mqtt.broker", "nl": false, "rap": true, "rh": 0, "inputs": 0, "wires": [["mqtt.out"]] }
|
||||||
options = options || {};
|
options = options || {};
|
||||||
const ts = String(Date.now());
|
|
||||||
const node = {
|
const node = {
|
||||||
"id": id || (type + "." + ts),
|
"id": id || (type.replace(/[\W]/g, ".")),
|
||||||
"type": type,
|
"type": type,
|
||||||
"name": name || (type.replace(/[\W]/g,"_") + "_" + ts),
|
"name": name || (type.replace(/[\W]/g, "_")),
|
||||||
"wires": []
|
"wires": []
|
||||||
}
|
}
|
||||||
|
if (node.id.indexOf(".") == -1) { node.is += ".node" }
|
||||||
updateNodeOptions(node, options, wires);
|
updateNodeOptions(node, options, wires);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
@ -676,6 +589,24 @@ function compareMsgToExpected(msg, expectMsg) {
|
|||||||
if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); }
|
if (hasProperty(expectMsg, "messageExpiryInterval")) { msg.should.have.property("messageExpiryInterval", expectMsg.messageExpiryInterval); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function waitBrokerConnect(broker, callback, timeLimit) {
|
||||||
|
timeLimit = timeLimit || 2000;
|
||||||
|
const brokers = Array.isArray(broker) ? broker : [broker];
|
||||||
|
wait();
|
||||||
|
function wait() {
|
||||||
|
if (brokers.every(e => e.connected == true)) {
|
||||||
|
callback(); //yey - connected!
|
||||||
|
} else {
|
||||||
|
timeLimit = timeLimit - 15;
|
||||||
|
if (timeLimit <= 0) {
|
||||||
|
throw new Error("Timeout waiting broker connect")
|
||||||
|
}
|
||||||
|
setTimeout(wait, 15);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function hasProperty(obj, propName) {
|
function hasProperty(obj, propName) {
|
||||||
return Object.prototype.hasOwnProperty.call(obj, propName);
|
return Object.prototype.hasOwnProperty.call(obj, propName);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user