diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html index c1b02e836..6a2b324fe 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html @@ -10,6 +10,51 @@ See the License for the specific language governing permissions and limitations under the License. --> + - - - - + RED.nodes.registerType('mqtt in',{ + category: 'network', + defaults: { + name: {value:""}, + topic: {value:"",required:true,validate: RED.validators.regex(/^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/)}, + qos: {value: "2"}, + datatype: {value:"auto",required:true}, + broker: {type:"mqtt-broker", required:true}, + // subscriptionIdentifier: {value:0}, + nl: {value:false}, + rap: {value:true}, + rh: {value:0}, + }, + color:"#d8bfd8", + inputs:0, + outputs:1, + icon: "bridge.svg", + label: function() { + return this.name||this.topic||"mqtt"; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + }, + oneditprepare: function() { + $("#node-input-broker").on("change",function(d){ + var confNode = RED.nodes.node($("#node-input-broker").val()); + var v5 = confNode && confNode.protocolVersion == "5"; + if(v5) { + $("div.form-row.mqtt5").show(); + } else { + $("div.form-row.mqtt5").hide(); + } + }); + if (this.qos === undefined) { + $("#node-input-qos").val("2"); + } + if (this.datatype === undefined) { + $("#node-input-datatype").val("auto"); + } + } + }); + + RED.nodes.registerType('mqtt out',{ + category: 'network', + defaults: { + name: {value:""}, + topic: {value:""}, + qos: {value:""}, + retain: {value:""}, + respTopic: {value:""}, + contentType: {value:""}, + userProps: {value:''}, + correl: {value:''}, + expiry: {value:''}, + broker: {type:"mqtt-broker", required:true} + }, + color:"#d8bfd8", + inputs:1, + outputs:0, + icon: "bridge.svg", + align: "right", + label: function() { + return this.name||this.topic||"mqtt"; + }, + oneditprepare: function() { + var that = this; + + function showHideDynamicFields() { + var confNode = RED.nodes.node($("#node-input-broker").val()); + var v5 = confNode && confNode.protocolVersion == "5"; + if(v5) { + $("div.form-row.mqtt5").show(); + var t = $("#node-input-respTopic").typedInput("type"); + if (t == 'none') { + $("#node-input-correl").parent().hide(); + } else { + $("#node-input-correl").parent().show(); + } + } else { + $("div.form-row.mqtt5").hide(); + } + } + + $("#node-input-broker").on("change",function(d){ + showHideDynamicFields(); + }); + + var respTopicTI = $("#node-input-respTopic").typedInput({ + default: !this.respTopic ? 'none':'str', + types: [typedInputNoneOpt, 'str'], + }); + + var correlTI = $("#node-input-correl").typedInput({ + default: !this.correl ? 'none':'str', + types: [typedInputNoneOpt, 'str'] + }); + //show / hide correlation data depending on respTopic + respTopicTI.on("change", showHideDynamicFields); + respTopicTI.triggerHandler("change"); + + $("#node-input-userProps").typedInput({ + default: !this.userProps ? 'none':'json', + types: [typedInputNoneOpt, 'json'], + }); + $("#node-input-expiry").typedInput({ + default: !this.expiry ? 'none':'num', + types: [typedInputNoneOpt, 'num'] + }); + $("#node-input-contentType").on('change', function (event, type, value, urg) { + console.log(event); + console.log("ct change",type,value, urg); + }).typedInput({ + default: getDefaultContentType(this.contentType), + types: contentTypeOpts + }) + }, + oneditsave: function() { + + var contentType = $("#node-input-contentType").val().trim(); + if (contentType === '') { + contentType = $("#node-input-contentType").typedInput('type'); + if (contentType === 'none' || contentType === 'other') { + contentType = ""; + } + } + $("#node-input-contentType").val(contentType) + + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +})(); + \ No newline at end of file diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 7ab624222..c9d0c8246 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -43,6 +43,109 @@ module.exports = function(RED) { return re.test(t); } + /** + * Helper function for setting integer property values in the MQTT V5 properties object + * @param {object} src Source object containing properties + * @param {object} dst Destination object to set/add properties + * @param {string} propName The property name to set in the Destination object + * @param {integer} [minVal] The minimum value. If the src value is less than minVal, it will NOT be set in the destination + * @param {integer} [maxVal] The maximum value. If the src value is greater than maxVal, it will NOT be set in the destination + * @param {integer} [def] An optional default to set in the destination object if prop is NOT present in the soruce object + */ + function setIntProp(src, dst, propName, minVal, maxVal, def) { + if (src.hasOwnProperty(propName)) { + var v = parseInt(src[propName]); + if(isNaN(v)) return; + if(minVal != null) { + if(v < minVal) return; + } + if(maxVal != null) { + if(v > maxVal) return; + } + dst[propName] = v; + } else { + if(def != undefined) dst[propName] = def; + } + } + + /** + * Helper function for setting string property values in the MQTT V5 properties object + * @param {object} src Source object containing properties + * @param {object} dst Destination object to set/add properties + * @param {string} propName The property name to set in the Destination object + * @param {string} [def] An optional default to set in the destination object if prop is NOT present in the soruce object + */ + function setStrProp(src, dst, propName, def) { + if (src[propName] && typeof src[propName] == "string") { + dst[propName] = src[propName]; + } else { + if(def != undefined) dst[propName] = def; + } + } + + /** + * Helper function for setting boolean property values in the MQTT V5 properties object + * @param {object} src Source object containing properties + * @param {object} dst Destination object to set/add properties + * @param {string} propName The property name to set in the Destination object + * @param {boolean} [def] An optional default to set in the destination object if prop is NOT present in the soruce object + */ + function setBoolProp(src, dst, propName, def) { + if (src[propName] != null) { + if(src[propName] === "true" || src[propName] === true) { + dst[propName] = true; + } else if(src[propName] === "false" || src[propName] === false) { + dst[propName] = true; + } + } else { + if(def != undefined) dst[propName] = def; + } + } + + /** + * Helper function for copying the MQTT v5 srcUserProperties object (parameter1) to the properties object (parameter2). + * Any property in srcUserProperties that is NOT a key/string pair will be silently discarded. + * NOTE: if no sutable properties are present, the userProperties object will NOT be added to the properties object + * @param {object} srcUserProperties An object with key/value string pairs + * @param {object} properties A properties object in which userProperties will be copied to + */ + function setUserProperties(srcUserProperties, properties) { + if (srcUserProperties && typeof srcUserProperties == "object") { + let _clone = {}; + let count = 0; + let keys = Object.keys(srcUserProperties); + if(!keys || !keys.length) return null; + keys.forEach(key => { + let val = srcUserProperties[key]; + if(typeof val == "string") { + count++; + _clone[key] = val; + } + }); + if(count) properties.userProperties = _clone; + } + } + + /** + * Helper function for copying the MQTT v5 buffer type properties + * NOTE: if src[propName] is not a buffer, dst[propName] will NOT be assigned a value (unless def is set) + * @param {object} src Source object containing properties + * @param {object} dst Destination object to set/add properties + * @param {string} propName The property name to set in the Destination object + * @param {boolean} [def] An optional default to set in the destination object if prop is NOT present in the Source object + */ + function setBufferProp(src, dst, propName, def) { + if(!dst) return; + if (src && dst) { + var buf = src[propName]; + if (buf && typeof Buffer.isBuffer(buf)) { + dst[propName] = Buffer.from(buf); + } + } else { + if(def != undefined) dst[propName] = def; + } + } + function MQTTBrokerNode(n) { RED.nodes.createNode(this,n); @@ -54,8 +157,15 @@ module.exports = function(RED) { this.usews = n.usews; this.verifyservercert = n.verifyservercert; this.compatmode = n.compatmode; + this.protocolVersion = n.protocolVersion; this.keepalive = n.keepalive; this.cleansession = n.cleansession; + this.sessionExpiryInterval = n.sessionExpiry; + this.topicAliasMaximum = n.topicAliasMaximum; + this.maximumPacketSize = n.maximumPacketSize; + this.receiveMaximum = n.receiveMaximum; + this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116 + this.userPropertiesType = n.userPropertiesType; // Config node state this.brokerurl = ""; @@ -71,8 +181,23 @@ module.exports = function(RED) { topic: n.birthTopic, payload: n.birthPayload || "", qos: Number(n.birthQos||0), - retain: n.birthRetain=="true"|| n.birthRetain===true + retain: n.birthRetain=="true"|| n.birthRetain===true, + //TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties }; + if (n.birthMsg) { + setStrProp(n.birthMsg, this.birthMessage, "contentType"); + if(n.birthMsg.userProps && /^ *{/.test(n.birthMsg.userProps)) { + try { + this.birthMessage.userProperties = JSON.parse(n.birthMsg.userProps) + } catch(err) {} + } + n.birthMsg.responseTopic = n.birthMsg.respTopic; + setStrProp(n.birthMsg, this.birthMessage, "responseTopic"); + n.birthMsg.correlationData = n.birthMsg.correl; + setBufferProp(n.birthMsg, this.birthMessage, "correlationData"); + n.birthMsg.messageExpiryInterval = n.birthMsg.expiry + setIntProp(n.willMsg,this.birthMessage, "messageExpiryInterval") + } } if (n.closeTopic) { @@ -80,8 +205,23 @@ module.exports = function(RED) { topic: n.closeTopic, payload: n.closePayload || "", qos: Number(n.closeQos||0), - retain: n.closeRetain=="true"|| n.closeRetain===true + retain: n.closeRetain=="true"|| n.closeRetain===true, + //TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties }; + if (n.closeMsg) { + setStrProp(n.closeMsg, this.closeMessage, "contentType"); + if(n.closeMsg.userProps && /^ *{/.test(n.closeMsg.userProps)) { + try { + this.closeMessage.userProperties = JSON.parse(n.closeMsg.userProps) + } catch(err) {} + } + n.closeMsg.responseTopic = n.closeMsg.respTopic; + setStrProp(n.closeMsg, this.closeMessage, "responseTopic"); + n.closeMsg.correlationData = n.closeMsg.correl; + setBufferProp(n.closeMsg, this.closeMessage, "correlationData"); + n.closeMsg.messageExpiryInterval = n.closeMsg.expiry + setIntProp(n.willMsg,this.closeMessage, "messageExpiryInterval") + } } if (this.credentials) { @@ -97,9 +237,6 @@ module.exports = function(RED) { if (typeof this.usews === 'undefined') { this.usews = false; } - if (typeof this.compatmode === 'undefined') { - this.compatmode = false; - } if (typeof this.verifyservercert === 'undefined') { this.verifyservercert = false; } @@ -183,9 +320,22 @@ module.exports = function(RED) { this.options.keepalive = this.keepalive; this.options.clean = this.cleansession; this.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000; - if (this.compatmode == "true" || this.compatmode === true) { + if (this.compatmode == "true" || this.compatmode === true || this.protocolVersion == 3) { this.options.protocolId = 'MQIsdp'; this.options.protocolVersion = 3; + } else if ( this.protocolVersion == 5 ) { + this.options.protocolVersion = 5; + this.options.properties = {}; + this.options.properties.requestResponseInformation = true; + this.options.properties.requestProblemInformation = true; + if(this.userProperties && /^ *{/.test(this.userProperties)) { + try { + setUserProperties(JSON.parse(this.userProperties), this.options.properties); + } catch(err) {} + } + if (this.sessionExpiryInterval && this.sessionExpiryInterval !== "0") { + setIntProp(this,this.options.properties,"sessionExpiryInterval"); + } } if (this.usetls && n.tls) { var tlsNode = RED.nodes.getNode(n.tls); @@ -193,7 +343,6 @@ module.exports = function(RED) { tlsNode.addTLSOptions(this.options); } } - // console.log(this.brokerurl,this.options); // If there's no rejectUnauthorized already, then this could be an // old config where this option was provided on the broker node and @@ -207,10 +356,32 @@ module.exports = function(RED) { topic: n.willTopic, payload: n.willPayload || "", qos: Number(n.willQos||0), - retain: n.willRetain=="true"|| n.willRetain===true + retain: n.willRetain=="true"|| n.willRetain===true, + //TODO: add willDelayInterval, payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties }; + if (n.willMsg) { + this.options.will.properties = {}; + + setStrProp(n.willMsg, this.options.will.properties, "contentType"); + if(n.willMsg.userProps && /^ *{/.test(n.willMsg.userProps)) { + try { + this.options.will.properties.userProperties = JSON.parse(n.willMsg.userProps) + } catch(err) {} + } + n.willMsg.responseTopic = n.willMsg.respTopic; + setStrProp(n.willMsg, this.options.will.properties, "responseTopic"); + n.willMsg.correlationData = n.willMsg.correl; + setBufferProp(n.willMsg, this.options.will.properties, "correlationData"); + n.willMsg.willDelayInterval = n.willMsg.delay + setIntProp(n.willMsg,this.options.will.properties, "willDelayInterval") + n.willMsg.messageExpiryInterval = n.willMsg.expiry + setIntProp(n.willMsg,this.options.will.properties, "messageExpiryInterval") + this.options.will.payloadFormatIndicator = true; + } } + // console.log(this.brokerurl,this.options); + // Define functions called by MQTT in and out nodes var node = this; this.users = {}; @@ -242,13 +413,36 @@ module.exports = function(RED) { if (!node.connected && !node.connecting) { node.connecting = true; try { + node.serverProperties = {}; node.client = mqtt.connect(node.brokerurl ,node.options); node.client.setMaxListeners(0); // Register successful connect or reconnect handler - node.client.on('connect', function () { + node.client.on('connect', function (connack) { node.connecting = false; node.connected = true; + node.topicAliases = {}; node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl})); + if(node.options.protocolVersion == 5 && connack && connack.hasOwnProperty("properties")) { + if(typeof connack.properties == "object") { + //clean & assign all props sent from server. + setIntProp(connack.properties, node.serverProperties, "topicAliasMaximum", 0); + setIntProp(connack.properties, node.serverProperties, "receiveMaximum", 0); + setIntProp(connack.properties, node.serverProperties, "sessionExpiryInterval", 0, 0xFFFFFFFF); + setIntProp(connack.properties, node.serverProperties, "maximumQoS", 0, 2); + setBoolProp(connack.properties, node.serverProperties, "retainAvailable",true); + setBoolProp(connack.properties, node.serverProperties, "wildcardSubscriptionAvailable", true); + setBoolProp(connack.properties, node.serverProperties, "subscriptionIdentifiersAvailable", true); + setBoolProp(connack.properties, node.serverProperties, "sharedSubscriptionAvailable"); + setIntProp(connack.properties, node.serverProperties, "maximumPacketSize", 0); + setIntProp(connack.properties, node.serverProperties, "serverKeepAlive"); + setStrProp(connack.properties, node.serverProperties, "responseInformation"); + setStrProp(connack.properties, node.serverProperties, "serverReference"); + setStrProp(connack.properties, node.serverProperties, "assignedClientIdentifier"); + setStrProp(connack.properties, node.serverProperties, "reasonString"); + setUserProperties(connack.properties, node.serverProperties); + // node.debug("CONNECTED. node.serverProperties ==> "+JSON.stringify(node.serverProperties));//TODO: remove + } + } for (var id in node.users) { if (node.users.hasOwnProperty(id)) { node.users[id].status({fill:"green",shape:"dot",text:"node-red:common.status.connected"}); @@ -260,16 +454,18 @@ module.exports = function(RED) { // Re-subscribe to stored topics for (var s in node.subscriptions) { if (node.subscriptions.hasOwnProperty(s)) { - var topic = s; - var qos = 0; + let topic = s; + let qos = 0; + let _options = {}; for (var r in node.subscriptions[s]) { if (node.subscriptions[s].hasOwnProperty(r)) { qos = Math.max(qos,node.subscriptions[s][r].qos); + _options = node.subscriptions[s][r].options; node.client.on('message',node.subscriptions[s][r].handler); } } - var options = {qos: qos}; - node.client.subscribe(topic, options); + _options.qos = _options.qos || qos; + node.client.subscribe(topic, _options); } } @@ -284,7 +480,14 @@ module.exports = function(RED) { node.users[id].status({fill:"yellow",shape:"ring",text:"node-red:common.status.connecting"}); } } - }) + }); + //TODO: what to do with this event? Anything? Necessary? + node.client.on("disconnect", function(packet) { + //Emitted after receiving disconnect packet from broker. MQTT 5.0 feature. + var rc = packet && packet.properties && packet.properties.reasonString; + var rc = packet && packet.properties && packet.reasonCode; + //TODO: If keeping this event, do we use these? log these? + }); // Register disconnect handlers node.client.on('close', function () { if (node.connected) { @@ -302,31 +505,54 @@ module.exports = function(RED) { // Register connect error handler // The client's own reconnect logic will take care of errors - node.client.on('error', function (error) {}); + node.client.on('error', function (error) { + }); }catch(err) { console.log(err); } } }; - this.subscribe = function (topic,qos,callback,ref) { + this.subscriptionIds = {}; + this.subid = 1; + this.subscribe = function (topic,options,callback,ref) { ref = ref||0; + var qos; + if(typeof options == "object") { + qos = options.qos; + } else { + qos = options; + options = {}; + } + options.qos = qos; + if (!node.subscriptionIds[topic]) { + node.subscriptionIds[topic] = node.subid++; + } + options.properties = options.properties || {}; + options.properties.subscriptionIdentifier = node.subscriptionIds[topic]; + node.subscriptions[topic] = node.subscriptions[topic]||{}; var sub = { topic:topic, qos:qos, + options:options, handler:function(mtopic,mpayload, mpacket) { - if (matchTopic(topic,mtopic)) { + if(mpacket.properties && options.properties && mpacket.properties.subscriptionIdentifier && options.properties.subscriptionIdentifier && (mpacket.properties.subscriptionIdentifier !== options.properties.subscriptionIdentifier) ) { + //do nothing as subscriptionIdentifier does not match + // node.debug(`> no match - this nodes subID (${options.properties.subscriptionIdentifier}) !== packet subID (${mpacket.properties.subscriptionIdentifier})`); //TODO: remove + } else if (matchTopic(topic,mtopic)) { + // node.debug(`> MATCHED '${topic}' to '${mtopic}' - performing callback`); //TODO: remove callback(mtopic,mpayload, mpacket); + } else { + // node.debug(`> no match / no callback`); //TODO: remove } }, ref: ref }; node.subscriptions[topic][ref] = sub; if (node.connected) { + // node.debug(`this.subscribe - registering handler ref ${ref} for ${topic} and subscribing `+JSON.stringify(options)); //TODO: remove node.client.on('message',sub.handler); - var options = {}; - options.qos = qos; node.client.subscribe(topic, options); } }; @@ -334,21 +560,35 @@ module.exports = function(RED) { this.unsubscribe = function (topic, ref, removed) { ref = ref||0; var sub = node.subscriptions[topic]; + // var _debug = `unsubscribe for topic ${topic} called... ` ; //TODO: remove if (sub) { + // _debug += "sub found. " //TODO: remove if (sub[ref]) { + // debug(`this.unsubscribe - removing handler ref ${ref} for ${topic} `); //TODO: remove + // _debug += `removing handler ref ${ref} for ${topic}. ` node.client.removeListener('message',sub[ref].handler); delete sub[ref]; } - if (removed) { + //TODO: Review. The `if(removed)` was commented out to always delete and remove subscriptions. + // if we dont then property changes dont get applied and old subs still trigger + //if (removed) { + if (Object.keys(sub).length === 0) { delete node.subscriptions[topic]; + delete node.subscriptionIds[topic]; if (node.connected) { + // _debug += `calling client.unsubscribe to remove topic ${topic}` //TODO: remove node.client.unsubscribe(topic); } } - } + //} + } else { + // _debug += "sub not found! "; //TODO: remove } + // node.debug(_debug); //TODO: remove + }; + this.topicAliases = {}; this.publish = function (msg,done) { if (node.connected) { @@ -361,13 +601,39 @@ module.exports = function(RED) { msg.payload = "" + msg.payload; } } - var options = { qos: msg.qos || 0, retain: msg.retain || false }; + //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback + if(node.options.protocolVersion == 5) { + options.properties = options.properties || {}; + setStrProp(msg, options.properties, "responseTopic"); + setBufferProp(msg, options.properties, "correlationData"); + setStrProp(msg, options.properties, "contentType"); + setIntProp(msg, options.properties, "messageExpiryInterval", 0); + if (msg.userProperties) { + options.properties.userProperties = msg.userProperties; + } + setUserProperties(msg.userProperties, options.properties); + setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0); + setBoolProp(msg, options.properties, "payloadFormatIndicator"); + //FUTURE setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455); + if (options.properties.topicAlias) { + if (!node.topicAliases.hasOwnProperty(options.properties.topicAlias) && msg.topic == "") { + done("Invalid topicAlias"); + return + } + if (node.topicAliases[options.properties.topicAlias] === msg.topic) { + msg.topic = "" + } else { + node.topicAliases[options.properties.topicAlias] = msg.topic + } + } + } + node.client.publish(msg.topic, msg.payload, options, function(err) { - done && done(); + done && done(err); return }); } @@ -405,6 +671,12 @@ module.exports = function(RED) { RED.nodes.createNode(this,n); this.topic = n.topic; this.qos = parseInt(n.qos); + this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117 + this.nl = n.nl; + this.rap = n.rap; + this.rh = n.rh; + + if (isNaN(this.qos) || this.qos < 0 || this.qos > 2) { this.qos = 2; } @@ -416,10 +688,27 @@ module.exports = function(RED) { this.datatype = n.datatype || "utf8"; var node = this; if (this.brokerConn) { + let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5; this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"}); if (this.topic) { node.brokerConn.register(this); - this.brokerConn.subscribe(this.topic,this.qos,function(topic,payload,packet) { + let options = { qos: this.qos }; + if(v5) { + // options.properties = {}; + // if(node.userProperties) { + // let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, {}); + // setUserProperties(userProperties, options.properties); + // } + // setIntProp(node,options.properties,"subscriptionIdentifier", 1); + setIntProp(node, options, "rh"); + if(node.nl === "true" || node.nl === true) options.nl = true; + else if(node.nl === "false" || node.nl === false) options.nl = false; + if(node.rap === "true" || node.rap === true) options.rap = true; + else if(node.rap === "false" || node.rap === false) options.rap = false; + } + + this.brokerConn.subscribe(this.topic,options,function(topic,payload,packet) { + // node.debug(`Sent ${topic}, datatype ${node.datatype} `+JSON.stringify(packet));//TODO: remove if (node.datatype === "buffer") { // payload = payload; } else if (node.datatype === "base64") { @@ -437,6 +726,18 @@ module.exports = function(RED) { if (isUtf8(payload)) { payload = payload.toString(); } } var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain}; + if(v5 && packet.properties) { + //msg.properties = packet.properties; + setStrProp(packet.properties, msg, "responseTopic"); + setBufferProp(packet.properties, msg, "correlationData"); + setStrProp(packet.properties, msg, "contentType"); + // setIntProp(packet.properties, msg, "topicAlias", 1, node.brokerConn.serverProperties.topicAliasMaximum || 0); + // setIntProp(packet.properties, msg, "subscriptionIdentifier", 1, 268435455); + setIntProp(packet.properties, msg, "messageExpiryInterval", 0); + setBoolProp(packet.properties, msg, "payloadFormatIndicator"); + setStrProp(packet.properties, msg, "reasonString"); + setUserProperties(packet.properties.userProperties, msg); + } if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) { msg._topic = topic; } @@ -467,11 +768,25 @@ module.exports = function(RED) { this.qos = n.qos || null; this.retain = n.retain; this.broker = n.broker; + this.responseTopic = n.respTopic;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901114 + this.correlationData = n.correl;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901115 + this.contentType = n.contentType;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901118 + this.messageExpiryInterval = n.expiry; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112 + try { + if (/^ *{/.test(n.userProps)) { + this.userProperties = JSON.parse(n.userProps);//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116 + } + } catch(err) {} + // this.topicAlias = n.topicAlias; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113 + // this.payloadFormatIndicator = n.payloadFormatIndicator; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111 + // this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117 + this.brokerConn = RED.nodes.getNode(this.broker); var node = this; var chk = /[\+#]/; if (this.brokerConn) { + let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5; this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"}); this.on("input",function(msg,send,done) { if (msg.qos) { @@ -483,13 +798,63 @@ module.exports = function(RED) { msg.qos = Number(node.qos || msg.qos || 0); msg.retain = node.retain || msg.retain || false; msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false; - if (node.topic) { - msg.topic = node.topic; + /** If node property exists, override/set that to property in msg */ + let msgPropOverride = function(propName) { if(node[propName]) { msg[propName] = node[propName]; } } + msgPropOverride("topic"); + if(v5) { + if(node.userProperties) { + msg.userProperties = node.userProperties; + } + if(node.responseTopic) { + msg.responseTopic = node.responseTopic; + } + if(node.correlationData) { + msg.correlationData = node.correlationData; + } + if(node.contentType) { + msg.contentType = node.contentType; + } + if(node.messageExpiryInterval) { + msg.messageExpiryInterval = node.messageExpiryInterval; + } + //Next, update/override the msg.xxxx properties from node config + //TODO: Should we be expecting msg.properties.xxxx instead of msg.xxxx? + // setStrProp(node,msg,"responseTopic"); + // setBufferProp(node,msg,"correlationData"); + // setStrProp(node,msg,"contentType"); + // setIntProp(node,msg,"messageExpiryInterval"); + //FUTURE setStrProp(node,msg,"topicAlias"); + //FUTURE setBoolProp(node,msg,"payloadFormatIndicator"); + //FUTURE setIntProp(node,msg,"subscriptionIdentifier"); } + if (msg.userProperties && typeof msg.userProperties !== "object") { + delete msg.userProperties; + } + if (msg.hasOwnProperty("topicAlias") && !isNaN(msg.topicAlias) && (msg.topicAlias === 0 || node.brokerConn.serverProperties.topicAliasMaximum === 0 || msg.topicAlias > node.brokerConn.serverProperties.topicAliasMaximum)) { + delete msg.topicAlias; + } + if ( msg.hasOwnProperty("payload")) { - if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist - if (chk.test(msg.topic)) { node.warn(RED._("mqtt.errors.invalid-topic")); } - this.brokerConn.publish(msg, done); // send the message + let topicOK = msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== ""); + if (!topicOK && v5) { + //NOTE: A value of 0 (in server props topicAliasMaximum) indicates that the Server does not accept any Topic Aliases on this connection + if (msg.hasOwnProperty("topicAlias") && !isNaN(msg.topicAlias) && msg.topicAlias >= 0 && node.brokerConn.serverProperties.topicAliasMaximum && node.brokerConn.serverProperties.topicAliasMaximum >= msg.topicAlias) { + topicOK = true; + msg.topic = ""; //must be empty string + } else if (msg.hasOwnProperty("responseTopic") && (typeof msg.responseTopic === "string") && (msg.responseTopic !== "")) { + //TODO: if topic is empty but responseTopic has a string value, use that instead. Is this desirable? + topicOK = true; + msg.topic = msg.responseTopic; + //TODO: delete msg.responseTopic - to prevent it being resent? + } + } + if (topicOK) { // topic must exist + // node.debug(`sending msg to ${msg.topic} `+JSON.stringify(msg));//TODO: remove + this.brokerConn.publish(msg, function(err) { + let args = arguments; + let l = args.length; + done(err); + }); // send the message } else { node.warn(RED._("mqtt.errors.invalid-topic")); done(); @@ -510,4 +875,4 @@ module.exports = function(RED) { } } RED.nodes.registerType("mqtt out",MQTTOutNode); -}; +}; \ No newline at end of file diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json index eb623ce20..b6c8e77ae 100755 --- a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json +++ b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json @@ -350,12 +350,38 @@ "retain": "Retain", "clientid": "Client ID", "port": "Port", - "keepalive": "Keep alive time (s)", + "keepalive": "Keep Alive", "cleansession": "Use clean session", + "cleanstart": "Use clean start", "use-tls": "Enable secure (SSL/TLS) connection", "tls-config":"TLS Configuration", "verify-server-cert":"Verify server certificate", - "compatmode": "Use legacy MQTT 3.1 support" + "compatmode": "Use legacy MQTT 3.1 support", + "userProperties": "User Properties", + "subscriptionIdentifier": "Subscription ID", + "flags": "Flags", + "nl": "Do not receive messages published by this client", + "rap": "Keep retain flag of original publish", + "rh": "Retained message handling ", + "rh0": "Send retained messages", + "rh1": "Only send for new subscriptions", + "rh2": "Do not send", + "responseTopic": "Response topic", + "contentType": "Content Type", + "correlationData": "Correlation Data", + "expiry": "Expiry (secs)", + "sessionExpiry": "Session Expiry (secs)", + "topicAlias": "Alias", + "payloadFormatIndicator": "Format", + "payloadFormatIndicatorFalse": "unspecified bytes (Default)", + "payloadFormatIndicatorTrue": "UTF-8 encoded payload", + "protocolVersion": "Protocol", + "protocolVersion3": "MQTT V3.1 (legacy)", + "protocolVersion4": "MQTT V3.1.1", + "protocolVersion5": "MQTT V5", + "topicAliasMaximum": "Alias Max", + "maximumPacketSize": "Max Packet Size", + "receiveMaximum": "Receive Max" }, "sections-label":{ "birth-message": "Message sent on connection (birth message)", diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/locales/en-US/network/10-mqtt.html index 1c6d1a081..558635ec7 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/network/10-mqtt.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/network/10-mqtt.html @@ -23,6 +23,17 @@
msg.payload
is used as the payload of the published message.