mqtt5 1st draft

This commit is contained in:
Steve-Mcl 2020-10-08 20:24:35 +01:00
parent 086f0f8450
commit 33bb86cbcf
3 changed files with 444 additions and 28 deletions

View File

@ -10,6 +10,55 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<style>
.form-row label:first-child {
width: 125px;
}
.mqtt-form-row-cols2 > input.mqtt-form-row-col1 {
width: calc(35% - 75px);
}
.mqtt-form-row-cols2 > select.mqtt-form-row-col1 {
width: calc(35% - 75px);
}
.mqtt-form-row-cols2 > label.mqtt-form-row-col2 {
width: 100px;
margin-left: 42px;
display: inline-block;
}
.mqtt-form-row-cols2 > input.mqtt-form-row-col2 {
width: calc(35% - 75px);
display: inline-block;
}
.mqtt-form-row-cols2 > select.mqtt-form-row-col2 {
width: calc(35% - 75px);
display: inline-block;
}
.form-row.mqtt-flags-row > label {
vertical-align: top;
}
.form-row.mqtt-flags-row > .mqtt-flags {
display: inline-block;
width: 70%
}
.form-row.mqtt-flags-row > .mqtt-flags > .mqtt-flag > label {
display: block;
padding-left: 15px;
text-indent: -15px;
width: 70%;
}
.form-row.mqtt-flags-row > .mqtt-flags > .mqtt-flag > label > input {
position: relative;
vertical-align: bottom;
top: -2px;
width: 15px;
height: 15px;
}
</style>
<script type="text/html" data-template-name="mqtt in">
<div class="form-row">
@ -28,6 +77,38 @@
<option value="2">2</option>
</select>
</div>
<div class="form-row">
<label for="node-input-userProperties"><i class="fa fa-tasks"></i> <span data-i18n="common.label.userProperties">User Properties</span></label>
<input id="node-input-userPropertiesType" type="hidden">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]common.label.userProperties" placeholder="User Properties">
</div>
<div class="form-row">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier">Subscription ID</span></label>
<input type="number" min="0" max="268435455" id="node-input-subscriptionIdentifier" class="mqtt-form-row-col1" >
</div>
<div class="form-row mqtt-flags-row">
<label for="node-input-nl" ><i class="fa fa-flag"></i> <span data-i18n="mqtt.label.flags">Flags</span></label>
<div class="mqtt-flags">
<div class="mqtt-flag">
<label for="node-input-nl">
<input type="checkbox" id="node-input-nl">
<span data-i18n="mqtt.label.nl">No local</span>
</label>
</div>
<div class="mqtt-flag">
<label for="node-input-rap">
<input type="checkbox" id="node-input-rap">
<span data-i18n="mqtt.label.rap">Retain as Published</span>
</label>
</div>
<div class="mqtt-flag">
<label for="node-input-rh">
<input type="checkbox" id="node-input-rh">
<span data-i18n="mqtt.label.rh">Retain Handling</span>
</label>
</div>
</div>
</div>
<div class="form-row">
<label for="node-input-datatype"><i class="fa fa-sign-out"></i> <span data-i18n="mqtt.label.output"></span></label>
<select id="node-input-datatype" style="width:70%;">
@ -52,6 +133,12 @@
topic: {value:"",required:true,validate: RED.validators.regex(/^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/)},
qos: {value: "2"},
datatype: {value:"auto",required:true},
userPropertiesType: {value:"none"},
userProperties: {value:"userProperties"},
subscriptionIdentifier: {value:0},
nl: {value:false},
rap: {value:false},
rh: {value:false},
broker: {type:"mqtt-broker", required:true}
},
color:"#d8bfd8",
@ -71,6 +158,11 @@
if (this.datatype === undefined) {
$("#node-input-datatype").val("auto");
}
$("#node-input-userProperties").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'flow','global','json'],
typeField: $("#node-input-userPropertiesType")
});
}
});
</script>
@ -84,20 +176,66 @@
<label for="node-input-topic"><i class="fa fa-tasks"></i> <span data-i18n="common.label.topic"></span></label>
<input type="text" id="node-input-topic" data-i18n="[placeholder]common.label.topic">
</div>
<div class="form-row">
<label for="node-input-qos"><i class="fa fa-empire"></i> <span data-i18n="mqtt.label.qos"></span></label>
<select id="node-input-qos" style="width:125px !important">
<div class="form-row mqtt-form-row-cols2">
<label for="node-input-qos" class="mqtt-form-row-col1"><i class="fa fa-empire"></i> <span data-i18n="mqtt.label.qos"></span></label>
<select id="node-input-qos" class="mqtt-form-row-col1">
<option value=""></option>
<option value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
</select>
&nbsp;&nbsp;<i class="fa fa-history"></i>&nbsp;<span data-i18n="mqtt.retain"></span> &nbsp;<select id="node-input-retain" style="width:125px !important">
<label for="node-input-retain" class="mqtt-form-row-col2"><i class="fa fa-history"></i> <span data-i18n="mqtt.retain"></span></label>
<select id="node-input-retain" class="mqtt-form-row-col2" >
<option value=""></option>
<option value="false" data-i18n="mqtt.false"></option>
<option value="true" data-i18n="mqtt.true"></option>
</select>
</div>
<hr>
<div class="form-row">
<label for="node-input-userProperties"><i class="fa fa-tasks"></i> <span data-i18n="common.label.userProperties">User Properties</span></label>
<input id="node-input-userPropertiesType" type="hidden">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]common.label.userProperties" placeholder="User Properties">
</div>
<div class="form-row">
<label for="node-input-responseTopic"><i class="fa fa-tag"></i> <span data-i18n="common.label.responseTopic">Response topic</span></label>
<input type="text" id="node-input-responseTopic" data-i18n="[placeholder]common.label.responseTopic" placeholder="Response Topic">
</div>
<div class="form-row">
<label for="node-input-correlationData"><i class="fa fa-tag"></i> <span data-i18n="common.label.correlationData">correlation Data</span></label>
<input id="node-input-correlationDataType" type="hidden">
<input type="text" id="node-input-correlationData" style="width: 70%;" data-i18n="[placeholder]common.label.correlationData" placeholder="correlation Data">
</div>
<div class="form-row">
<label for="node-input-contentType"><i class="fa fa-tag"></i> <span data-i18n="common.label.contentType">Content Type</span></label>
<input type="text" id="node-input-contentType" data-i18n="[placeholder]common.label.contentType" placeholder="Content Type">
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-input-messageExpiryInterval" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.messageExpiryInterval">Expiry (secs)</span></label>
<input type="number" min="0" id="node-input-messageExpiryInterval" class="mqtt-form-row-col1" >
<label for="node-input-topicAlias" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.topicAlias">Alias</span></label>
<input type="number" min="0" id="node-input-topicAlias" class="mqtt-form-row-col2" >
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier">Subscription ID</span></label>
<input type="number" min="0" max="268435455" id="node-input-subscriptionIdentifier" class="mqtt-form-row-col1" >
<label for="node-input-payloadFormatIndicator" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.payloadFormatIndicator">Format</span></label>
<select id="node-input-payloadFormatIndicator" class="mqtt-form-row-col2" >
<option value=""></option>
<option value="false" data-i18n="mqtt.payloadFormatIndicatorFalse">unspecified bytes (Default)</option>
<option value="true" data-i18n="mqtt.payloadFormatIndicatorTrue">UTF-8 encoded payload</option>
</select>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
@ -113,6 +251,16 @@
topic: {value:""},
qos: {value:""},
retain: {value:""},
responseTopic: {value:""},
contentType: {value:""},
userProperties: {value:'userProperties'},
userPropertiesType: {value:'none'},
correlationData: {value:'correlationData'},
correlationDataType: {value:'none'},
topicAlias: {value:0},
messageExpiryInterval: {value:0},
payloadFormatIndicator: {value:false},
subscriptionIdentifier: {value:0},
broker: {type:"mqtt-broker", required:true}
},
color:"#d8bfd8",
@ -123,6 +271,18 @@
label: function() {
return this.name||this.topic||"mqtt";
},
oneditprepare: function() {
$("#node-input-correlationData").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'msg','flow','global','bin'],
typeField: $("#node-input-correlationDataType")
});
$("#node-input-userProperties").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'msg','flow','global','json'],
typeField: $("#node-input-userPropertiesType")
});
},
labelStyle: function() {
return this.name?"node_label_italic":"";
}
@ -163,9 +323,27 @@
<label for="node-config-input-cleansession" style="width: auto;" data-i18n="mqtt.label.cleansession"></label>
</div>
<div class="form-row">
<input type="checkbox" id="node-config-input-compatmode" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-config-input-compatmode" style="width: auto;" data-i18n="mqtt.label.compatmode"></label>
<label for="node-config-input-protocolVersion"><i class="fa fa-cog"></i> <span data-i18n="mqtt.label.protocolVersion"></span></label>
<select id="node-config-input-protocolVersion" style="width:70%;">
<option value="3" data-i18n="mqtt.label.protocolVersion3"></option>
<option value="4" data-i18n="mqtt.label.protocolVersion4"></option>
<option value="5" data-i18n="mqtt.label.protocolVersion5"></option>
</select>
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.sessionExpiryInterval">Session Exp</span></label>
<input type="number" min="0" id="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col1" >
<label for="node-config-input-topicAliasMaximum" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.topicAliasMaximum">Alias Max</span></label>
<input type="number" min="0" id="node-config-input-topicAliasMaximum" class="mqtt-form-row-col2" >
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-config-input-maximumPacketSize" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.maximumPacketSize">Max Packet Size</span></label>
<input type="number" min="0" id="node-config-input-maximumPacketSize" class="mqtt-form-row-col1" >
<label for="node-config-input-receiveMaximum" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.receiveMaximum">Receive Max</span></label>
<input type="number" min="0" id="node-config-input-receiveMaximum" class="mqtt-form-row-col2" >
</div>
</div>
<div id="mqtt-broker-tab-security" style="display:none">
<div class="form-row">
@ -279,6 +457,7 @@
usetls: {value: false},
verifyservercert: { value: false},
compatmode: { value: false},
protocolVersion: { value: 4},
keepalive: {value:60,validate:RED.validators.number()},
cleansession: {value: true},
birthTopic: {value:""},
@ -292,7 +471,11 @@
willTopic: {value:""},
willQos: {value:"0"},
willRetain: {value:false},
willPayload: {value:""}
willPayload: {value:""},
sessionExpiryInterval: {value:0},
topicAliasMaximum: {value:0},
maximumPacketSize: {value:0},
receiveMaximum: {value:0}
},
credentials: {
user: {type:"text"},
@ -370,10 +553,15 @@
this.usetls = false;
$("#node-config-input-usetls").prop("checked",false);
}
if (typeof this.compatmode === 'undefined') {
this.compatmode = false;
$("#node-config-input-compatmode").prop('checked', false);
if (this.compatmode === 'true' || this.compatmode === true) {
delete this.compatmode;
this.protocolVersion = 4;
}
if (typeof this.protocolVersion === 'undefined') {
this.protocolVersion = 4;
}
console.log("setting protocolVersion", this.protocolVersion)
$("#node-config-input-protocolVersion").val(this.protocolVersion);
if (typeof this.keepalive === 'undefined') {
this.keepalive = 15;
$("#node-config-input-keepalive").val(this.keepalive);
@ -444,4 +632,4 @@
}
}
});
</script>
</script>

View File

@ -14,6 +14,8 @@
* limitations under the License.
**/
const { debug } = require("console");
module.exports = function(RED) {
"use strict";
var mqtt = require("mqtt");
@ -43,6 +45,101 @@ module.exports = function(RED) {
return re.test(t);
}
/**
* Helper function for setting integer property values in the MQTT V5 properties object
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set/add properties
* @param {string} propName The property name to set in the Destination object
* @param {integer} [minVal] The minimum value. If the src value is less than minVal, it will NOT be set in the destination
* @param {integer} [maxVal] The maximum value. If the src value is greater than maxVal, it will NOT be set in the destination
* @param {integer} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
*/
function setIntProp(src, dst, propName, minVal, maxVal, def) {
if (src.hasOwnProperty(propName)) {
var v = parseInt(src[propName]);
if(isNaN(v)) return;
if(minVal != null) {
if(v < minVal) return;
}
if(maxVal != null) {
if(v > maxVal) return;
}
dst[propName] = v;
} else {
if(def != undefined) dst[propName] = def;
}
}
/**
* Helper function for setting string property values in the MQTT V5 properties object
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set/add properties
* @param {string} propName The property name to set in the Destination object
* @param {string} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
*/
function setStrProp(src, dst, propName, def) {
if (src[propName] && typeof src[propName] == "string") {
dst[propName] = src[propName];
} else {
if(def != undefined) dst[propName] = def;
}
}
/**
* Helper function for setting boolean property values in the MQTT V5 properties object
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set/add properties
* @param {string} propName The property name to set in the Destination object
* @param {boolean} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
*/
function setBoolProp(src, dst, propName, def) {
if (src[propName] != null) {
if(src[propName] === "true" || src[propName] === true) {
dst[propName] = true;
} else if(src[propName] === "false" || src[propName] === false) {
dst[propName] = true;
}
} else {
if(def != undefined) dst[propName] = def;
}
}
/**
* Helper function for copying the MQTT v5 srcUserProperties object (parameter1) to the properties object (parameter2).
* Any property in srcUserProperties that is NOT a key/string pair will be silently discarded.
* NOTE: if no sutable properties are present, the userProperties object will NOT be added to the properties object
* @param {object} srcUserProperties An object with key/value string pairs
* @param {object} properties A properties object in which userProperties will be copied to
*/
function setUserProperties(srcUserProperties, properties) {
if (srcUserProperties && typeof srcUserProperties == "object") {
let _clone = {};
let count = 0;
let keys = Object.keys(srcUserProperties);
if(!keys || !keys.length) return null;
keys.forEach(key => {
let val = srcUserProperties[key];
if(typeof val == "string") {
count++;
_clone[key] = val;
}
});
if(count) properties.userProperties = _clone;
}
}
/**
* Helper function for copying the MQTT v5 correlationData object (parameter1) to the properties object (parameter2).
* NOTE: if srcCorrelationData is not a buffer, correlationData will NOT be added to the properties object
* @param {object} srcCorrelationData An buffer containing correlationData
* @param {object} properties A properties object in which correlationData will be copied to
*/
function setCorrelationData(srcCorrelationData, properties) {
if (srcCorrelationData && typeof Buffer.isBuffer(srcCorrelationData)) {
properties.correlationData = Buffer.from(srcCorrelationData);
}
}
function MQTTBrokerNode(n) {
RED.nodes.createNode(this,n);
@ -54,9 +151,13 @@ module.exports = function(RED) {
this.usews = n.usews;
this.verifyservercert = n.verifyservercert;
this.compatmode = n.compatmode;
this.protocolVersion = n.protocolVersion;
this.keepalive = n.keepalive;
this.cleansession = n.cleansession;
this.sessionExpiryInterval = n.sessionExpiryInterval;
this.topicAliasMaximum = n.topicAliasMaximum;
this.maximumPacketSize = n.maximumPacketSize;
this.receiveMaximum = n.receiveMaximum;
// Config node state
this.brokerurl = "";
this.connected = false;
@ -183,9 +284,19 @@ module.exports = function(RED) {
this.options.keepalive = this.keepalive;
this.options.clean = this.cleansession;
this.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000;
if (this.compatmode == "true" || this.compatmode === true) {
if (this.compatmode == "true" || this.compatmode === true || this.protocolVersion == 3) {
this.options.protocolId = 'MQIsdp';
this.options.protocolVersion = 3;
} else if ( this.protocolVersion == 5 ) {
this.options.protocolVersion = 5;
this.options.properties = {};
this.options.properties.requestResponseInformation = true;
this.options.properties.requestProblemInformation = true;
setIntProp(this,this.options.properties,"sessionExpiryInterval", 0);
setIntProp(this,this.options.properties,"topicAliasMaximum", 0);
setIntProp(this,this.options.properties,"maximumPacketSize", 0);
setIntProp(this,this.options.properties,"receiveMaximum", 0);
this.options.properties.userProperties = {"node-red":"v1.3"};//test
}
if (this.usetls && n.tls) {
var tlsNode = RED.nodes.getNode(n.tls);
@ -242,6 +353,8 @@ module.exports = function(RED) {
if (!node.connected && !node.connecting) {
node.connecting = true;
try {
node.serverProperties = {};
debug("MQTT: ⬆️ mqtt.connect(node.brokerurl ,node.options)", node.brokerurl, node.options);
node.client = mqtt.connect(node.brokerurl ,node.options);
node.client.setMaxListeners(0);
// Register successful connect or reconnect handler
@ -249,6 +362,19 @@ module.exports = function(RED) {
node.connecting = false;
node.connected = true;
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
if(node.options.protocolVersion == 5 && arguments && arguments.length && arguments[0].hasOwnProperty("properties")) {
let mqttServerV5Properties = arguments[0].properties;
if(mqttServerV5Properties) {
setIntProp(mqttServerV5Properties, node.serverProperties, "topicAliasMaximum", 1);
setIntProp(mqttServerV5Properties, node.serverProperties, "receiveMaximum", 0);
setIntProp(mqttServerV5Properties, node.serverProperties, "sessionExpiryInterval", 0, 0xFFFFFFFF);
setIntProp(mqttServerV5Properties, node.serverProperties, "maximumPacketSize", 22);
setUserProperties(mqttServerV5Properties, node.serverProperties);
node.serverProperties.requestResponseInformation = mqttServerV5Properties.requestResponseInformation;
node.serverProperties.requestProblemInformation = mqttServerV5Properties.requestProblemInformation;
debug("MQTT: ⬆️ node.client.on('connect', cb) --> node.serverProperties", node.serverProperties );
}
}
for (var id in node.users) {
if (node.users.hasOwnProperty(id)) {
node.users[id].status({fill:"green",shape:"dot",text:"node-red:common.status.connected"});
@ -260,16 +386,18 @@ module.exports = function(RED) {
// Re-subscribe to stored topics
for (var s in node.subscriptions) {
if (node.subscriptions.hasOwnProperty(s)) {
var topic = s;
var qos = 0;
let topic = s;
let qos = 0;
let _options = {};
for (var r in node.subscriptions[s]) {
if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos);
_options = node.subscriptions[s][r].options;
node.client.on('message',node.subscriptions[s][r].handler);
}
}
var options = {qos: qos};
node.client.subscribe(topic, options);
_options.qos = _options.qos || qos;
node.client.subscribe(topic, _options);
}
}
@ -309,12 +437,21 @@ module.exports = function(RED) {
}
};
this.subscribe = function (topic,qos,callback,ref) {
this.subscribe = function (topic,options,callback,ref) {
ref = ref||0;
var qos;
if(typeof options == "object") {
qos = options.qos;
} else {
qos = options;
options = {};
}
options.qos = qos;
node.subscriptions[topic] = node.subscriptions[topic]||{};
var sub = {
topic:topic,
qos:qos,
options:options,
handler:function(mtopic,mpayload, mpacket) {
if (matchTopic(topic,mtopic)) {
callback(mtopic,mpayload, mpacket);
@ -325,8 +462,8 @@ module.exports = function(RED) {
node.subscriptions[topic][ref] = sub;
if (node.connected) {
node.client.on('message',sub.handler);
var options = {};
options.qos = qos;
// var options = {};
// options.qos = qos;
node.client.subscribe(topic, options);
}
};
@ -361,11 +498,23 @@ module.exports = function(RED) {
msg.payload = "" + msg.payload;
}
}
var options = {
qos: msg.qos || 0,
retain: msg.retain || false
};
//https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
if(node.options.protocolVersion == 5) {
options.properties = options.properties || {};
setStrProp(msg, options.properties, "responseTopic");
setStrProp(msg, options.properties, "contentType");
setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
setIntProp(msg, options.properties, "messageExpiryInterval", 0);
setBoolProp(msg, options.properties, "payloadFormatIndicator");
setUserProperties(msg.userProperties, options.properties);
setCorrelationData(msg.correlationData, options.properties);
}
debug("MQTT: ➡️➡️ node.client.publish",msg,options);
node.client.publish(msg.topic, msg.payload, options, function(err) {
done && done();
return
@ -405,6 +554,14 @@ module.exports = function(RED) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.qos = parseInt(n.qos);
this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
this.userPropertiesType = n.userPropertiesType;
this.nl = n.nl;
this.rap = n.rap;
this.rh = n.rh;
if (isNaN(this.qos) || this.qos < 0 || this.qos > 2) {
this.qos = 2;
}
@ -416,10 +573,33 @@ module.exports = function(RED) {
this.datatype = n.datatype || "utf8";
var node = this;
if (this.brokerConn) {
let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5;
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
if (this.topic) {
node.brokerConn.register(this);
this.brokerConn.subscribe(this.topic,this.qos,function(topic,payload,packet) {
let options = {
qos: this.qos
}
if(v5) {
options.properties = {};
if(node.userProperties && node.userPropertiesType !== "none") {
let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, {});
if(userProperties) options.properties.userProperties = userProperties;
}
if(node.subscriptionIdentifier) {
let sid = parseInt(node.subscriptionIdentifier);
if(sid >= 0 && !isNaN(sid) ) options.properties.subscriptionIdentifier = sid;//must not be sent if zero
}
if(node.nl === "true" || node.nl === true) options.nl = true;
if(node.nl === "false" || node.nl === false) options.nl = false;
if(node.rap === "true" || node.rap === true) options.rap = true;
if(node.rap === "false" || node.rap === false) options.rap = false;
if(node.rh === "true" || node.rh === true || node.rh == 1) options.rh = 1;
if(node.rh === "false" || node.rh === false || node.rh == 0) options.rh = 0;
}
debug("MQTT: ⬅️⬅️ this.brokerConn.subscribe",this.topic,options);
this.brokerConn.subscribe(this.topic,options,function(topic,payload,packet) {
debug("MQTT: ⬅️⬅️ ON this.brokerConn.subscribe",topic,payload,packet);
if (node.datatype === "buffer") {
// payload = payload;
} else if (node.datatype === "base64") {
@ -437,9 +617,21 @@ module.exports = function(RED) {
if (isUtf8(payload)) { payload = payload.toString(); }
}
var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain};
if(v5 && packet.properties) {
//msg.properties = packet.properties;
setStrProp(packet.properties, msg, "responseTopic");
setStrProp(packet.properties, msg, "contentType");
setIntProp(packet.properties, msg, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
setIntProp(packet.properties, msg, "subscriptionIdentifier", 1, 268435455);
setIntProp(packet.properties, msg, "messageExpiryInterval", 0);
setBoolProp(packet.properties, msg, "payloadFormatIndicator");
setUserProperties(packet.properties.userProperties, options.properties);
setCorrelationData(packet.properties.correlationData, options.properties);
}
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic;
}
debug("MQTT: ⬅️⬅️ node.send",msg);
node.send(msg);
}, this.id);
if (this.brokerConn.connected) {
@ -467,11 +659,23 @@ module.exports = function(RED) {
this.qos = n.qos || null;
this.retain = n.retain;
this.broker = n.broker;
this.responseTopic = n.responseTopic;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901114
this.correlationData = n.correlationData;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901115
this.correlationDataType = n.correlationDataType;
this.contentType = n.contentType;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901118
this.topicAlias = n.topicAlias; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
this.messageExpiryInterval = n.messageExpiryInterval; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112
this.payloadFormatIndicator = n.payloadFormatIndicator; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111
this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
this.userPropertiesType = n.userPropertiesType;
this.brokerConn = RED.nodes.getNode(this.broker);
var node = this;
var chk = /[\+#]/;
if (this.brokerConn) {
let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5;
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
this.on("input",function(msg,send,done) {
if (msg.qos) {
@ -483,12 +687,32 @@ module.exports = function(RED) {
msg.qos = Number(node.qos || msg.qos || 0);
msg.retain = node.retain || msg.retain || false;
msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
if (node.topic) {
msg.topic = node.topic;
if(v5) {
//TODO: contain all v5 props in an object?
if(node.userProperties && node.userPropertiesType !== "none") {
let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, msg);
if(userProperties) msg.userProperties = userProperties;
}
if(node.correlationData && node.correlationDataType !== "none") {
let correlationData = RED.util.evaluateNodeProperty(node.correlationData, node.correlationDataType, node, msg);
if(correlationData) msg.correlationData = correlationData;
}
var msgPropOverride = function(propName) { if(node[propName]) { msg[propName] = node[propName]; } }
msgPropOverride("responseTopic");
msgPropOverride("contentType");
msgPropOverride("topicAlias");
msgPropOverride("messageExpiryInterval");
msgPropOverride("payloadFormatIndicator");
msgPropOverride("subscriptionIdentifier");
msgPropOverride("topic");
}
if ( msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
if (chk.test(msg.topic)) { node.warn(RED._("mqtt.errors.invalid-topic")); }
let topicOK = msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "");
if(!topicOK && v5 && msg.topicAlias && node.brokerConn.serverProperties.topicAliasMaximum) {
topicOK = typeof msg.topicAlias === "number" && msg.topicAlias >= 0 && node.brokerConn.serverProperties.topicAliasMaximum >= msg.topicAlias
msg.topic = "";//must be empty string?
}
if (topicOK) { // topic must exist
this.brokerConn.publish(msg, done); // send the message
} else {
node.warn(RED._("mqtt.errors.invalid-topic"));
@ -510,4 +734,4 @@ module.exports = function(RED) {
}
}
RED.nodes.registerType("mqtt out",MQTTOutNode);
};
};

View File

@ -355,7 +355,11 @@
"use-tls": "Enable secure (SSL/TLS) connection",
"tls-config":"TLS Configuration",
"verify-server-cert":"Verify server certificate",
"compatmode": "Use legacy MQTT 3.1 support"
"compatmode": "Use legacy MQTT 3.1 support",
"protocolVersion": "Version",
"protocolVersion3": "MQTT Protocol v3.1 (legacy)",
"protocolVersion4": "MQTT Protocol v3.1.1",
"protocolVersion5": "MQTT Protocol v5"
},
"sections-label":{
"birth-message": "Message sent on connection (birth message)",