Merge pull request #3840 from Steve-Mcl/fix-mqtt-session-time

ensure sessionExpiry(Interval) is applied
This commit is contained in:
Nick O'Leary 2022-09-02 20:45:41 +01:00 committed by GitHub
commit a032c2e326
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 11 deletions

View File

@ -474,7 +474,6 @@ module.exports = function(RED) {
setIfHasProperty(opts, node, "protocolVersion", init); setIfHasProperty(opts, node, "protocolVersion", init);
setIfHasProperty(opts, node, "keepalive", init); setIfHasProperty(opts, node, "keepalive", init);
setIfHasProperty(opts, node, "cleansession", init); setIfHasProperty(opts, node, "cleansession", init);
setIfHasProperty(opts, node, "sessionExpiry", init);
setIfHasProperty(opts, node, "topicAliasMaximum", init); setIfHasProperty(opts, node, "topicAliasMaximum", init);
setIfHasProperty(opts, node, "maximumPacketSize", init); setIfHasProperty(opts, node, "maximumPacketSize", init);
setIfHasProperty(opts, node, "receiveMaximum", init); setIfHasProperty(opts, node, "receiveMaximum", init);
@ -484,6 +483,11 @@ module.exports = function(RED) {
} else if (hasProperty(opts, "userProps")) { } else if (hasProperty(opts, "userProps")) {
node.userProperties = opts.userProps; node.userProperties = opts.userProps;
} }
if (hasProperty(opts, "sessionExpiry")) {
node.sessionExpiryInterval = opts.sessionExpiry;
} else if (hasProperty(opts, "sessionExpiryInterval")) {
node.sessionExpiryInterval = opts.sessionExpiryInterval
}
function createLWT(topic, payload, qos, retain, v5opts, v5SubPropName) { function createLWT(topic, payload, qos, retain, v5opts, v5SubPropName) {
let message = undefined; let message = undefined;
@ -782,7 +786,9 @@ module.exports = function(RED) {
// Send any birth message // Send any birth message
if (node.birthMessage) { if (node.birthMessage) {
node.publish(node.birthMessage); setTimeout(() => {
node.publish(node.birthMessage);
}, 1);
} }
}); });
node._clientOn("reconnect", function() { node._clientOn("reconnect", function() {

View File

@ -27,7 +27,7 @@ describe('MQTT Nodes', function () {
} catch (error) { } } catch (error) { }
}); });
it('should be loaded and have default values', function (done) { it('should be loaded and have default values (MQTT V4)', function (done) {
this.timeout = 2000; this.timeout = 2000;
const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false }, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" }); const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false }, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" });
helper.load(mqttNodes, flow, function () { helper.load(mqttNodes, flow, function () {
@ -61,6 +61,52 @@ describe('MQTT Nodes', function () {
mqttBroker.options.clientId.should.containEql('nodered_'); mqttBroker.options.clientId.should.containEql('nodered_');
mqttBroker.options.should.have.property('keepalive').type("number"); mqttBroker.options.should.have.property('keepalive').type("number");
mqttBroker.options.should.have.property('reconnectPeriod').type("number"); mqttBroker.options.should.have.property('reconnectPeriod').type("number");
//as this is not a v5 connection, ensure v5 properties are not present
mqttBroker.options.should.not.have.property('protocolVersion', 5);
mqttBroker.options.should.not.have.property('properties');
done();
} catch (error) {
done(error)
}
});
});
it('should be loaded and have default values (MQTT V5)', function (done) {
this.timeout = 2000;
const { flow, nodes } = buildBasicMQTTSendRecvFlow({ id: "mqtt.broker", name: "mqtt_broker", autoConnect: false, cleansession: false, clientid: 'clientid', keepalive: 35, sessionExpiry: '6000', protocolVersion: '5', userProps: {"prop": "val"}}, { id: "mqtt.in", topic: "in_topic" }, { id: "mqtt.out", topic: "out_topic" });
helper.load(mqttNodes, flow, function () {
try {
const mqttIn = helper.getNode("mqtt.in");
const mqttOut = helper.getNode("mqtt.out");
const mqttBroker = helper.getNode("mqtt.broker");
should(mqttIn).be.type("object", "mqtt in node should be an object")
mqttIn.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
mqttIn.should.have.property('datatype', 'utf8'); //default: 'utf8'
mqttIn.should.have.property('isDynamic', false); //default: false
mqttIn.should.have.property('inputs', 0); //default: 0
mqttIn.should.have.property('qos', 2); //default: 2
mqttIn.should.have.property('topic', "in_topic");
mqttIn.should.have.property('wires', [["helper.node"]]);
should(mqttOut).be.type("object", "mqtt out node should be an object")
mqttOut.should.have.property('broker', nodes.mqtt_broker.id); //should be the id of the broker node
mqttOut.should.have.property('topic', "out_topic");
should(mqttBroker).be.type("object", "mqtt broker node should be an object")
mqttBroker.should.have.property('broker', BROKER_HOST);
mqttBroker.should.have.property('port', BROKER_PORT);
mqttBroker.should.have.property('brokerurl');
// mqttBroker.should.have.property('autoUnsubscribe', true);//default: true
mqttBroker.should.have.property('autoConnect', false);//Set "autoConnect:false" in brokerOptions
mqttBroker.should.have.property('options');
mqttBroker.options.should.have.property('clean', false);
mqttBroker.options.should.have.property('clientId', 'clientid');
mqttBroker.options.should.have.property('keepalive').type("number", 35);
mqttBroker.options.should.have.property('reconnectPeriod').type("number");
//as this IS a v5 connection, ensure v5 properties are not present
mqttBroker.options.should.have.property('protocolVersion', 5);
mqttBroker.options.should.have.property('properties');
mqttBroker.options.properties.should.have.property('sessionExpiryInterval');
done(); done();
} catch (error) { } catch (error) {
done(error) done(error)
@ -173,7 +219,7 @@ describe('MQTT Nodes', function () {
topic: nextTopic(), topic: nextTopic(),
payload: '{prop:"value3", "num":3}', // send invalid JSON ... payload: '{prop:"value3", "num":3}', // send invalid JSON ...
} }
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null } const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null }
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => { hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
helperNode.on("input", function (msg) { helperNode.on("input", function (msg) {
try { try {
@ -299,7 +345,7 @@ describe('MQTT Nodes', function () {
topic: nextTopic(), topic: nextTopic(),
payload: '{prop:"value3", "num":3}', contentType: "application/json", // send invalid JSON ... payload: '{prop:"value3", "num":3}', contentType: "application/json", // send invalid JSON ...
} }
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null } const hooks = { done: null, beforeLoad: null, afterLoad: null, afterConnect: null }
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => { hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
helperNode.on("input", function (msg) { helperNode.on("input", function (msg) {
try { try {
@ -385,7 +431,7 @@ describe('MQTT Nodes', function () {
if (skipTests) { return this.skip() } if (skipTests) { return this.skip() }
this.timeout = 2000; this.timeout = 2000;
const options = {} const options = {}
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null } const hooks = { beforeLoad: null, afterLoad: null, afterConnect: null }
hooks.beforeLoad = (flow) => { //add a status node pointed at MQTT Out node (to watch for connection status change) hooks.beforeLoad = (flow) => { //add a status node pointed at MQTT Out node (to watch for connection status change)
flow.push({ "id": "status.node", "type": "status", "name": "status_node", "scope": ["mqtt.out"], "wires": [["helper.node"]] });//add status node to watch mqtt_out flow.push({ "id": "status.node", "type": "status", "name": "status_node", "scope": ["mqtt.out"], "wires": [["helper.node"]] });//add status node to watch mqtt_out
} }
@ -416,18 +462,31 @@ describe('MQTT Nodes', function () {
this.timeout = 2000; this.timeout = 2000;
const baseTopic = nextTopic(); const baseTopic = nextTopic();
const brokerOptions = { const brokerOptions = {
autoConnect: false,
protocolVersion: 4, protocolVersion: 4,
birthTopic: baseTopic + "/birth", birthTopic: baseTopic + "/birth",
birthPayload: "broker connected", birthPayload: "broker birth",
birthQos: 2, birthQos: 2,
} }
const options = {}; const expectMsg = {
const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null };
options.expectMsg = {
topic: brokerOptions.birthTopic, topic: brokerOptions.birthTopic,
payload: brokerOptions.birthPayload, payload: brokerOptions.birthPayload,
qos: brokerOptions.birthQos qos: brokerOptions.birthQos
}; };
const options = { };
const hooks = { };
hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
helperNode.on("input", function (msg) {
try {
compareMsgToExpected(msg, expectMsg);
done();
} catch (error) {
done(error)
}
})
mqttIn.receive({ "action": "connect" }); //now request connect action
return true; //handled
}
testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, options, hooks); testSendRecv(brokerOptions, { topic: brokerOptions.birthTopic }, {}, options, hooks);
}); });
itConditional('should publish close message', function (done) { itConditional('should publish close message', function (done) {
@ -666,6 +725,7 @@ function buildMQTTBrokerNode(id, name, brokerHost, brokerPort, options) {
node.cleansession = String(options.cleansession) == "false" ? false : true; node.cleansession = String(options.cleansession) == "false" ? false : true;
node.autoUnsubscribe = String(options.autoUnsubscribe) == "false" ? false : true; node.autoUnsubscribe = String(options.autoUnsubscribe) == "false" ? false : true;
node.autoConnect = String(options.autoConnect) == "false" ? false : true; node.autoConnect = String(options.autoConnect) == "false" ? false : true;
node.sessionExpiry = options.sessionExpiry ? options.sessionExpiry : undefined;
if (options.birthTopic) { if (options.birthTopic) {
node.birthTopic = options.birthTopic; node.birthTopic = options.birthTopic;
@ -760,8 +820,8 @@ function waitBrokerConnect(broker, timeLimit) {
let waitConnected = (broker, timeLimit) => { let waitConnected = (broker, timeLimit) => {
const brokers = Array.isArray(broker) ? broker : [broker]; const brokers = Array.isArray(broker) ? broker : [broker];
timeLimit = timeLimit || 1000; timeLimit = timeLimit || 1000;
let timer, resolved = false;
return new Promise( (resolve, reject) => { return new Promise( (resolve, reject) => {
let timer, resolved = false;
timer = wait(); timer = wait();
function wait() { function wait() {
if (brokers.every(e => e.connected == true)) { if (brokers.every(e => e.connected == true)) {