mqttv5 progress

This commit is contained in:
Steve-Mcl 2020-10-14 23:30:03 +01:00
parent 38649de85f
commit 195aeb5caf
3 changed files with 259 additions and 133 deletions

View File

@ -57,7 +57,6 @@
width: 15px;
height: 15px;
}
</style>
<script type="text/html" data-template-name="mqtt in">
@ -77,38 +76,40 @@
<option value="2">2</option>
</select>
</div>
<div class="form-row">
<label for="node-input-userProperties"><i class="fa fa-tasks"></i> <span data-i18n="common.label.userProperties">User Properties</span></label>
<div class="form-row mqtt5">
<label for="node-input-userProperties"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.userProperties"></span></label>
<input id="node-input-userPropertiesType" type="hidden">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]common.label.userProperties" placeholder="User Properties">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]mqtt.label.userProperties" placeholder="">
</div>
<div class="form-row">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier">Subscription ID</span></label>
<div class="form-row mqtt5">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier"></span></label>
<input type="number" min="0" max="268435455" id="node-input-subscriptionIdentifier" class="mqtt-form-row-col1" >
</div>
<div class="form-row mqtt-flags-row">
<div class="form-row mqtt-flags-row mqtt5">
<label for="node-input-nl" ><i class="fa fa-flag"></i> <span data-i18n="mqtt.label.flags">Flags</span></label>
<div class="mqtt-flags">
<div class="mqtt-flag">
<label for="node-input-nl">
<input type="checkbox" id="node-input-nl">
<span data-i18n="mqtt.label.nl">No local</span>
<span data-i18n="mqtt.label.nl"></span>
</label>
</div>
<div class="mqtt-flag">
<label for="node-input-rap">
<input type="checkbox" id="node-input-rap">
<span data-i18n="mqtt.label.rap">Retain as Published</span>
</label>
</div>
<div class="mqtt-flag">
<label for="node-input-rh">
<input type="checkbox" id="node-input-rh">
<span data-i18n="mqtt.label.rh">Retain Handling</span>
<span data-i18n="mqtt.label.rap"></span>
</label>
</div>
</div>
</div>
<div class="form-row mqtt5">
<label for="node-input-rh" ><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.rh"></span></label>
<select id="node-input-rh" style="width:70%;">
<option value="0" data-i18n="mqtt.label.rh0"></option>
<option value="1" data-i18n="mqtt.label.rh1"></option>
<option value="2" data-i18n="mqtt.label.rh2"></option>
</select>
</div>
<div class="form-row">
<label for="node-input-datatype"><i class="fa fa-sign-out"></i> <span data-i18n="mqtt.label.output"></span></label>
<select id="node-input-datatype" style="width:70%;">
@ -133,13 +134,13 @@
topic: {value:"",required:true,validate: RED.validators.regex(/^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/)},
qos: {value: "2"},
datatype: {value:"auto",required:true},
userPropertiesType: {value:"none"},
userProperties: {value:"userProperties"},
broker: {type:"mqtt-broker", required:true},
userPropertiesType: {value:"json"},
userProperties: {value:"{}"},
subscriptionIdentifier: {value:0},
nl: {value:false},
rap: {value:false},
rap: {value:true},
rh: {value:false},
broker: {type:"mqtt-broker", required:true}
},
color:"#d8bfd8",
inputs:0,
@ -152,6 +153,15 @@
return this.name?"node_label_italic":"";
},
oneditprepare: function() {
$("#node-input-broker").change(function(d){
var confNode = RED.nodes.node($("#node-input-broker").val());
var v5 = confNode && confNode.protocolVersion == "5";
if(v5) {
$(".mqtt5").show();
} else {
$(".mqtt5").hide();
}
});
if (this.qos === undefined) {
$("#node-input-qos").val("2");
}
@ -159,8 +169,8 @@
$("#node-input-datatype").val("auto");
}
$("#node-input-userProperties").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'flow','global','json'],
default: "json",
types:['json','flow','global'],
typeField: $("#node-input-userPropertiesType")
});
}
@ -193,46 +203,42 @@
<option value="true" data-i18n="mqtt.true"></option>
</select>
</div>
<hr>
<div class="form-row">
<label for="node-input-userProperties"><i class="fa fa-tasks"></i> <span data-i18n="common.label.userProperties">User Properties</span></label>
<input id="node-input-userPropertiesType" type="hidden">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]common.label.userProperties" placeholder="User Properties">
<div class="form-row mqtt5">
<label for="node-input-responseTopic"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.responseTopic"></span></label>
<input type="text" id="node-input-responseTopic" data-i18n="[placeholder]mqtt.label.responseTopic" placeholder="">
</div>
<div class="form-row">
<label for="node-input-responseTopic"><i class="fa fa-tag"></i> <span data-i18n="common.label.responseTopic">Response topic</span></label>
<input type="text" id="node-input-responseTopic" data-i18n="[placeholder]common.label.responseTopic" placeholder="Response Topic">
</div>
<div class="form-row">
<label for="node-input-correlationData"><i class="fa fa-tag"></i> <span data-i18n="common.label.correlationData">correlation Data</span></label>
<div class="form-row mqtt5">
<label for="node-input-correlationData"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.correlationData"></span></label>
<input id="node-input-correlationDataType" type="hidden">
<input type="text" id="node-input-correlationData" style="width: 70%;" data-i18n="[placeholder]common.label.correlationData" placeholder="correlation Data">
<input type="text" id="node-input-correlationData" style="width: 70%;" data-i18n="[placeholder]mqtt.label.correlationData" placeholder="">
</div>
<div class="form-row mqtt5">
<label for="node-input-userProperties"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.userProperties"></span></label>
<input id="node-input-userPropertiesType" type="hidden">
<input type="text" id="node-input-userProperties" style="width: 70%;" data-i18n="[placeholder]mqtt.label.userProperties" placeholder="">
</div>
<div class="form-row mqtt5">
<label for="node-input-contentType"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.contentType"></span></label>
<input type="text" id="node-input-contentType" data-i18n="[placeholder]mqtt.label.contentType" placeholder="">
</div>
<div class="form-row">
<label for="node-input-contentType"><i class="fa fa-tag"></i> <span data-i18n="common.label.contentType">Content Type</span></label>
<input type="text" id="node-input-contentType" data-i18n="[placeholder]common.label.contentType" placeholder="Content Type">
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-input-messageExpiryInterval" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.messageExpiryInterval">Expiry (secs)</span></label>
<div class="form-row mqtt-form-row-cols2 mqtt5">
<label for="node-input-messageExpiryInterval" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.messageExpiryInterval"></span></label>
<input type="number" min="0" id="node-input-messageExpiryInterval" class="mqtt-form-row-col1" >
<label for="node-input-topicAlias" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.topicAlias">Alias</span></label>
<input type="number" min="0" id="node-input-topicAlias" class="mqtt-form-row-col2" >
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier">Subscription ID</span></label>
<div class="form-row mqtt-form-row-cols2 mqtt5">
<label for="node-input-subscriptionIdentifier" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.subscriptionIdentifier"></span></label>
<input type="number" min="0" max="268435455" id="node-input-subscriptionIdentifier" class="mqtt-form-row-col1" >
<label for="node-input-payloadFormatIndicator" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.payloadFormatIndicator">Format</span></label>
<label for="node-input-payloadFormatIndicator" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.payloadFormatIndicator"></span></label>
<select id="node-input-payloadFormatIndicator" class="mqtt-form-row-col2" >
<option value=""></option>
<option value="false" data-i18n="mqtt.payloadFormatIndicatorFalse">unspecified bytes (Default)</option>
<option value="true" data-i18n="mqtt.payloadFormatIndicatorTrue">UTF-8 encoded payload</option>
<option value="false" data-i18n="mqtt.label.payloadFormatIndicatorFalse"></option>
<option value="true" data-i18n="mqtt.label.payloadFormatIndicatorTrue"></option>
</select>
</div>
@ -254,9 +260,9 @@
responseTopic: {value:""},
contentType: {value:""},
userProperties: {value:'userProperties'},
userPropertiesType: {value:'none'},
userPropertiesType: {value:'msg'},
correlationData: {value:'correlationData'},
correlationDataType: {value:'none'},
correlationDataType: {value:'msg'},
topicAlias: {value:0},
messageExpiryInterval: {value:0},
payloadFormatIndicator: {value:false},
@ -272,14 +278,23 @@
return this.name||this.topic||"mqtt";
},
oneditprepare: function() {
$("#node-input-broker").change(function(d){
var confNode = RED.nodes.node($("#node-input-broker").val());
var v5 = confNode && confNode.protocolVersion == "5";
if(v5) {
$(".mqtt5").show();
} else {
$(".mqtt5").hide();
}
});
$("#node-input-correlationData").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'msg','flow','global','bin'],
types:[{ value:"none", label: "None", hasValue:false },'bin','msg','flow','global'],
typeField: $("#node-input-correlationDataType")
});
$("#node-input-userProperties").typedInput({
default: "none",
types:[{ value:"none", label: "None", hasValue:false },'msg','flow','global','json'],
default: "msg",
types:['msg','flow','global','json'],
typeField: $("#node-input-userPropertiesType")
});
},
@ -319,8 +334,17 @@
<div class="form-row">
<label for="node-config-input-keepalive" style="width: auto"><i class="fa fa-clock-o"></i> <span data-i18n="mqtt.label.keepalive"></span></label>
<input type="text" id="node-config-input-keepalive" style="width: 50px">
<input type="checkbox" id="node-config-input-cleansession" style="margin-left: 30px; height: 1em;display: inline-block; width: auto; vertical-align: middle;">
<label for="node-config-input-cleansession" style="width: auto;" data-i18n="mqtt.label.cleansession"></label>
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-config-input-cleansession" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.session"></span>Session</label>
<div class="mqtt-flag mqtt-form-row-col1" style="display: inline-block;width: calc(35% - 75px);">
<label for="node-config-input-cleansession" style="display: block; padding-left: 15px; text-indent: -15px;">
<input type="checkbox" id="node-config-input-cleansession" style="position: relative;vertical-align: bottom; top: -2px; width: 15px;height: 15px;">
<span data-i18n="mqtt.label.cleansession"></span>
</label>
</div>
<label for="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col2 mqtt5 sessionExpiryInterval"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.sessionExpiryInterval"></span></label>
<input type="number" min="0" id="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col2 mqtt5 sessionExpiryInterval" >
</div>
<div class="form-row">
<label for="node-config-input-protocolVersion"><i class="fa fa-cog"></i> <span data-i18n="mqtt.label.protocolVersion"></span></label>
@ -329,20 +353,23 @@
<option value="4" data-i18n="mqtt.label.protocolVersion4"></option>
<option value="5" data-i18n="mqtt.label.protocolVersion5"></option>
</select>
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.sessionExpiryInterval">Session Exp</span></label>
<input type="number" min="0" id="node-config-input-sessionExpiryInterval" class="mqtt-form-row-col1" >
<label for="node-config-input-topicAliasMaximum" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.topicAliasMaximum">Alias Max</span></label>
<input type="number" min="0" id="node-config-input-topicAliasMaximum" class="mqtt-form-row-col2" >
</div>
<div class="form-row mqtt5">
<label for="node-config-input-userProperties"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.userProperties"></span></label>
<input id="node-config-input-userPropertiesType" type="hidden">
<input type="number" id="node-config-input-userProperties" style="width: 70%;" data-i18n="[placeholder]mqtt.label.userProperties" placeholder="">
</div>
<div class="form-row mqtt5">
<label for="node-config-input-topicAliasMaximum"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.topicAliasMaximum"></span></label>
<input type="number" min="0" id="node-config-input-topicAliasMaximum">
</div>
<div class="form-row mqtt-form-row-cols2">
<label for="node-config-input-maximumPacketSize" class="mqtt-form-row-col1"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.maximumPacketSize">Max Packet Size</span></label>
<input type="number" min="0" id="node-config-input-maximumPacketSize" class="mqtt-form-row-col1" >
<label for="node-config-input-receiveMaximum" class="mqtt-form-row-col2"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.receiveMaximum">Receive Max</span></label>
<input type="number" min="0" id="node-config-input-receiveMaximum" class="mqtt-form-row-col2" >
<div class="form-row mqtt5">
<label for="node-config-input-maximumPacketSize" ><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.maximumPacketSize"></span></label>
<input type="number" min="0" id="node-config-input-maximumPacketSize">
</div>
<div class="form-row mqtt5">
<label for="node-config-input-receiveMaximum" ><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.receiveMaximum"></span></label>
<input type="number" min="0" id="node-config-input-receiveMaximum">
</div>
</div>
<div id="mqtt-broker-tab-security" style="display:none">
@ -475,7 +502,9 @@
sessionExpiryInterval: {value:0},
topicAliasMaximum: {value:0},
maximumPacketSize: {value:0},
receiveMaximum: {value:0}
receiveMaximum: {value:0},
userPropertiesType: {value:"json"},
userProperties: {value:"{}"},
},
credentials: {
user: {type:"text"},
@ -560,8 +589,29 @@
if (typeof this.protocolVersion === 'undefined') {
this.protocolVersion = 4;
}
console.log("setting protocolVersion", this.protocolVersion)
function updateSessionExpiryIntervalVisibility() {
var hide = ($("#node-config-input-cleansession").is(":checked") || $("#node-config-input-protocolVersion").val() != "5");
if(hide) {
$(".sessionExpiryInterval").hide();
} else {
$(".sessionExpiryInterval").show();
}
}
$("#node-config-input-protocolVersion").change(function(){
var v5 = $("#node-config-input-protocolVersion").val() == "5";
if(v5) {
$(".mqtt5").show();
} else {
$(".mqtt5").hide();
}
updateSessionExpiryIntervalVisibility();
});
$("#node-config-input-protocolVersion").val(this.protocolVersion);
$("#node-config-input-userProperties").typedInput({
default: "json",
types:['json','flow','global'],
typeField: $("#node-config-input-userPropertiesType")
});
if (typeof this.keepalive === 'undefined') {
this.keepalive = 15;
$("#node-config-input-keepalive").val(this.keepalive);
@ -597,6 +647,7 @@
} else {
$("#node-config-input-clientid").attr("placeholder",node._("mqtt.placeholder.clientid-nonclean"));
}
updateSessionExpiryIntervalVisibility();
$("#node-config-input-clientid").change();
}
setTimeout(updateClientId,0);

View File

@ -129,14 +129,22 @@ module.exports = function(RED) {
}
/**
* Helper function for copying the MQTT v5 correlationData object (parameter1) to the properties object (parameter2).
* NOTE: if srcCorrelationData is not a buffer, correlationData will NOT be added to the properties object
* @param {object} srcCorrelationData An buffer containing correlationData
* @param {object} properties A properties object in which correlationData will be copied to
* Helper function for copying the MQTT v5 buffer type properties
* NOTE: if src[propName] is not a buffer, dst[propName] will NOT be assigned a value (unless def is set)
* @param {object} src Source object containing properties
* @param {object} dst Destination object to set/add properties
* @param {string} propName The property name to set in the Destination object
* @param {boolean} [def] An optional default to set in the destination object if prop is NOT present in the Source object
*/
function setCorrelationData(srcCorrelationData, properties) {
if (srcCorrelationData && typeof Buffer.isBuffer(srcCorrelationData)) {
properties.correlationData = Buffer.from(srcCorrelationData);
function setBufferProp(src, dst, propName, def) {
if(!dst) return;
if (src && dst) {
var buf = src[propName];
if (buf && typeof Buffer.isBuffer(buf)) {
dst[propName] = Buffer.from(buf);
}
} else {
if(def != undefined) dst[propName] = def;
}
}
@ -157,7 +165,10 @@ module.exports = function(RED) {
this.sessionExpiryInterval = n.sessionExpiryInterval;
this.topicAliasMaximum = n.topicAliasMaximum;
this.maximumPacketSize = n.maximumPacketSize;
this.receiveMaximum = n.receiveMaximum;
this.receiveMaximum = n.receiveMaximum;
this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
this.userPropertiesType = n.userPropertiesType;
// Config node state
this.brokerurl = "";
this.connected = false;
@ -172,7 +183,8 @@ module.exports = function(RED) {
topic: n.birthTopic,
payload: n.birthPayload || "",
qos: Number(n.birthQos||0),
retain: n.birthRetain=="true"|| n.birthRetain===true
retain: n.birthRetain=="true"|| n.birthRetain===true,
//TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
};
}
@ -181,7 +193,8 @@ module.exports = function(RED) {
topic: n.closeTopic,
payload: n.closePayload || "",
qos: Number(n.closeQos||0),
retain: n.closeRetain=="true"|| n.closeRetain===true
retain: n.closeRetain=="true"|| n.closeRetain===true,
//TODO: add payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
};
}
@ -198,9 +211,6 @@ module.exports = function(RED) {
if (typeof this.usews === 'undefined') {
this.usews = false;
}
if (typeof this.compatmode === 'undefined') {
this.compatmode = false;
}
if (typeof this.verifyservercert === 'undefined') {
this.verifyservercert = false;
}
@ -292,11 +302,16 @@ module.exports = function(RED) {
this.options.properties = {};
this.options.properties.requestResponseInformation = true;
this.options.properties.requestProblemInformation = true;
if(this.userProperties) {
let userProperties = RED.util.evaluateNodeProperty(this.userProperties, this.userPropertiesType, this, {});
setUserProperties(userProperties, this.options.properties);
}
setIntProp(this,this.options.properties,"sessionExpiryInterval", 0);
setIntProp(this,this.options.properties,"topicAliasMaximum", 0);
setIntProp(this,this.options.properties,"maximumPacketSize", 0);
setIntProp(this,this.options.properties,"receiveMaximum", 0);
this.options.properties.userProperties = {"node-red":"v1.3"};//test
setIntProp(this,this.options.properties,"receiveMaximum", 0);
setStrProp(this,this.options.properties,"authenticationMethod"); //TODO: authenticationMethod: the name of the authentication method used for extended authentication string,
setBufferProp(this,this.options.properties,"authenticationData"); //TODO: authenticationData: Binary Data containing authentication data
}
if (this.usetls && n.tls) {
var tlsNode = RED.nodes.getNode(n.tls);
@ -318,7 +333,8 @@ module.exports = function(RED) {
topic: n.willTopic,
payload: n.willPayload || "",
qos: Number(n.willQos||0),
retain: n.willRetain=="true"|| n.willRetain===true
retain: n.willRetain=="true"|| n.willRetain===true,
//TODO: add willDelayInterval, payloadFormatIndicator, messageExpiryInterval, contentType, responseTopic, correlationData, userProperties
};
}
@ -354,25 +370,34 @@ module.exports = function(RED) {
node.connecting = true;
try {
node.serverProperties = {};
debug("MQTT: ⬆️ mqtt.connect(node.brokerurl ,node.options)", node.brokerurl, node.options);
// debug("MQTT: ⬆️ mqtt.connect(node.brokerurl ,node.options)", node.brokerurl, node.options);
node.client = mqtt.connect(node.brokerurl ,node.options);
node.client.setMaxListeners(0);
// Register successful connect or reconnect handler
node.client.on('connect', function () {
node.client.on('connect', function (connack) {
node.connecting = false;
node.connected = true;
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
if(node.options.protocolVersion == 5 && arguments && arguments.length && arguments[0].hasOwnProperty("properties")) {
if(node.options.protocolVersion == 5 && connack && connack.hasOwnProperty("properties")) {
let mqttServerV5Properties = arguments[0].properties;
if(mqttServerV5Properties) {
//node.serverProperties = mqttServerV5Properties; //assign all props, below, square them up.
setIntProp(mqttServerV5Properties, node.serverProperties, "topicAliasMaximum", 1);
setIntProp(mqttServerV5Properties, node.serverProperties, "receiveMaximum", 0);
setIntProp(mqttServerV5Properties, node.serverProperties, "sessionExpiryInterval", 0, 0xFFFFFFFF);
setIntProp(mqttServerV5Properties, node.serverProperties, "maximumPacketSize", 22);
setIntProp(mqttServerV5Properties, node.serverProperties, "maximumQoS", 0, 2);
setBoolProp(mqttServerV5Properties, node.serverProperties, "retainAvailable");
setBoolProp(mqttServerV5Properties, node.serverProperties, "wildcardSubscriptionAvailable");
setBoolProp(mqttServerV5Properties, node.serverProperties, "subscriptionIdentifiersAvailable");
setBoolProp(mqttServerV5Properties, node.serverProperties, "sharedSubscriptionAvailable");
setIntProp(mqttServerV5Properties, node.serverProperties, "maximumPacketSize", 0);
setIntProp(mqttServerV5Properties, node.serverProperties, "serverKeepAlive");
setStrProp(mqttServerV5Properties, node.serverProperties, "responseInformation");
setStrProp(mqttServerV5Properties, node.serverProperties, "serverReference");
setStrProp(mqttServerV5Properties, node.serverProperties, "assignedClientIdentifier");
setStrProp(mqttServerV5Properties, node.serverProperties, "reasonString");
setUserProperties(mqttServerV5Properties, node.serverProperties);
node.serverProperties.requestResponseInformation = mqttServerV5Properties.requestResponseInformation;
node.serverProperties.requestProblemInformation = mqttServerV5Properties.requestProblemInformation;
debug("MQTT: ⬆️ node.client.on('connect', cb) --> node.serverProperties", node.serverProperties );
debug("MQTTBrokerNode: ⬆ CONNECTED. node.serverProperties ==> ", node.serverProperties );
}
}
for (var id in node.users) {
@ -393,11 +418,12 @@ module.exports = function(RED) {
if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos);
_options = node.subscriptions[s][r].options;
debug(`MQTT: Adding handler for ${r} to ${s}`);
debug(`MQTTBrokerNode:${node.id}: Re-subscribe - registering handler ref ${r} for ${s} `);
node.client.on('message',node.subscriptions[s][r].handler);
}
}
_options.qos = _options.qos || qos;
debug(`MQTTBrokerNode:${node.id}: Re-subscribe - subscribing to topic '${topic}'`, _options);
node.client.subscribe(topic, _options);
}
}
@ -413,9 +439,17 @@ module.exports = function(RED) {
node.users[id].status({fill:"yellow",shape:"ring",text:"node-red:common.status.connecting"});
}
}
})
});
node.client.on("disconnect", function(packet) {
//Emitted after receiving disconnect packet from broker. MQTT 5.0 feature.
console.log('MQTTBrokerNode disconnect', packet)
var rc = packet && packet.properties && packet.properties.reasonString;
var rc = packet && packet.properties && packet.reasonCode;
//TODO: what to do with this event? Anything necessary?
});
// Register disconnect handlers
node.client.on('close', function () {
console.log('MQTTBrokerNode closed', arguments)
if (node.connected) {
node.connected = false;
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
@ -431,7 +465,9 @@ module.exports = function(RED) {
// Register connect error handler
// The client's own reconnect logic will take care of errors
node.client.on('error', function (error) {});
node.client.on('error', function (error) {
console.log('MQTTBrokerNode errored', error)
});
}catch(err) {
console.log(err);
}
@ -454,17 +490,22 @@ module.exports = function(RED) {
qos:qos,
options:options,
handler:function(mtopic,mpayload, mpacket) {
if (matchTopic(topic,mtopic)) {
debug(`MQTTBrokerNode:${node.id}: this.subscribe.handler - attempting to match '${topic}' to '${mtopic}' `, mpacket);
if(mpacket.properties && options.properties && mpacket.properties.subscriptionIdentifier && options.properties.subscriptionIdentifier && (mpacket.properties.subscriptionIdentifier !== options.properties.subscriptionIdentifier) ) {
//do nothing as subscriptionIdentifier does not match
debug(`MQTTBrokerNode:${node.id}: > no match - this nodes subID (${options.properties.subscriptionIdentifier}) !== packet subID (${mpacket.properties.subscriptionIdentifier})`);
} else if (matchTopic(topic,mtopic)) {
debug(`MQTTBrokerNode:${node.id}: > MATCHED '${topic}' to '${mtopic}' - performing callback`);
callback(mtopic,mpayload, mpacket);
}
} else
debug(`MQTTBrokerNode:${node.id}: > no match / no callback`);
},
ref: ref
};
node.subscriptions[topic][ref] = sub;
if (node.connected) {
debug(`MQTTBrokerNode:${node.id}: this.subscribe - registering handler ref ${ref} for ${topic} and subscribing`, options);
node.client.on('message',sub.handler);
// var options = {};
// options.qos = qos;
node.client.subscribe(topic, options);
}
};
@ -472,20 +513,30 @@ module.exports = function(RED) {
this.unsubscribe = function (topic, ref, removed) {
ref = ref||0;
var sub = node.subscriptions[topic];
var _debug = `MQTTBrokerNode ${node.id}: unsubscribe for topic ${topic} called... ` ;
if (sub) {
_debug += "sub found. "
if (sub[ref]) {
// debug(`MQTTBrokerNode:${node.id}: this.unsubscribe - removing handler ref ${ref} for ${topic} `);
_debug += `removing handler ref ${ref} for ${topic}. `
node.client.removeListener('message',sub[ref].handler);
delete sub[ref];
}
if (removed) {
//if (removed) {
if (Object.keys(sub).length === 0) {
delete node.subscriptions[topic];
if (node.connected) {
_debug += `calling client.unsubscribe to remove topic ${topic}`
// debug(`MQTTBrokerNode:${node.id}: this.unsubscribe - calling client.unsubscribe to remove topic ${topic} `);
node.client.unsubscribe(topic);
}
}
}
//}
} else {
_debug += "sub not found! "
}
debug(_debug);
};
this.publish = function (msg,done) {
@ -513,10 +564,11 @@ module.exports = function(RED) {
setIntProp(msg, options.properties, "messageExpiryInterval", 0);
setBoolProp(msg, options.properties, "payloadFormatIndicator");
setUserProperties(msg.userProperties, options.properties);
setCorrelationData(msg.correlationData, options.properties);
setBufferProp(msg, options.properties, "correlationData");
}
debug("MQTT: ➡️➡️ node.client.publish",msg,options);
debug(`MQTTBrokerNode:${node.id}: publish - sending payload to ${msg.topic ? msg.topic : (msg.topicAlias ? 'topicAlias-'+msg.topicAlias : '???') } `, options);
node.client.publish(msg.topic, msg.payload, options, function(err) {
if(err) node.error(err,msg);//catch errors
done && done();
return
});
@ -578,29 +630,23 @@ module.exports = function(RED) {
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
if (this.topic) {
node.brokerConn.register(this);
let options = {
qos: this.qos
}
let options = { qos: this.qos };
if(v5) {
options.properties = {};
if(node.userProperties && node.userPropertiesType !== "none") {
if(node.userProperties) {
let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, {});
if(userProperties) options.properties.userProperties = userProperties;
}
if(node.subscriptionIdentifier) {
let sid = parseInt(node.subscriptionIdentifier);
if(sid >= 0 && !isNaN(sid) ) options.properties.subscriptionIdentifier = sid;//must not be sent if zero
setUserProperties(userProperties, options.properties);
}
setIntProp(node,options.properties,"subscriptionIdentifier", 1);
setIntProp(node, options, "rh");
if(node.nl === "true" || node.nl === true) options.nl = true;
if(node.nl === "false" || node.nl === false) options.nl = false;
else if(node.nl === "false" || node.nl === false) options.nl = false;
if(node.rap === "true" || node.rap === true) options.rap = true;
if(node.rap === "false" || node.rap === false) options.rap = false;
if(node.rh === "true" || node.rh === true || node.rh == 1) options.rh = 1;
if(node.rh === "false" || node.rh === false || node.rh == 0) options.rh = 0;
else if(node.rap === "false" || node.rap === false) options.rap = false;
}
debug("MQTT: ⬅️⬅️ this.brokerConn.subscribe",this.topic,options);
// debug("MQTT: ⬅️⬅️ this.brokerConn.subscribe",this.topic,options);
this.brokerConn.subscribe(this.topic,options,function(topic,payload,packet) {
debug("MQTT: ⬅️⬅️ ON this.brokerConn.subscribe",topic,payload,packet);
debug(`MQTTInNode:${node.id}: Broker sent ${topic}, datatype ${node.datatype}`, packet);
if (node.datatype === "buffer") {
// payload = payload;
} else if (node.datatype === "base64") {
@ -622,17 +668,17 @@ module.exports = function(RED) {
//msg.properties = packet.properties;
setStrProp(packet.properties, msg, "responseTopic");
setStrProp(packet.properties, msg, "contentType");
setIntProp(packet.properties, msg, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
setIntProp(packet.properties, msg, "topicAlias", 1, node.brokerConn.serverProperties.topicAliasMaximum || 0);
setIntProp(packet.properties, msg, "subscriptionIdentifier", 1, 268435455);
setIntProp(packet.properties, msg, "messageExpiryInterval", 0);
setBoolProp(packet.properties, msg, "payloadFormatIndicator");
setUserProperties(packet.properties.userProperties, options.properties);
setCorrelationData(packet.properties.correlationData, options.properties);
setStrProp(packet.properties, msg, "reasonString");
setUserProperties(packet.properties.userProperties, msg);
setBufferProp(packet.properties, msg, "correlationData");
}
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic;
}
debug("MQTT: ⬅️⬅️ node.send",msg);
node.send(msg);
}, this.id);
if (this.brokerConn.connected) {
@ -688,9 +734,10 @@ module.exports = function(RED) {
msg.qos = Number(node.qos || msg.qos || 0);
msg.retain = node.retain || msg.retain || false;
msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
let msgPropOverride = function(propName) { if(node[propName]) { msg[propName] = node[propName]; } }
msgPropOverride("topic");
if(v5) {
//TODO: contain all v5 props in an object?
if(node.userProperties && node.userPropertiesType !== "none") {
if(node.userProperties) {
let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, msg);
if(userProperties) msg.userProperties = userProperties;
}
@ -698,14 +745,16 @@ module.exports = function(RED) {
let correlationData = RED.util.evaluateNodeProperty(node.correlationData, node.correlationDataType, node, msg);
if(correlationData) msg.correlationData = correlationData;
}
var msgPropOverride = function(propName) { if(node[propName]) { msg[propName] = node[propName]; } }
msgPropOverride("responseTopic");
msgPropOverride("contentType");
msgPropOverride("topicAlias");
msgPropOverride("messageExpiryInterval");
msgPropOverride("payloadFormatIndicator");
msgPropOverride("subscriptionIdentifier");
msgPropOverride("topic");
setIntProp(msg,msg,"topicAlias");
setIntProp(msg,msg,"messageExpiryInterval");
setBoolProp(msg,msg,"payloadFormatIndicator");
setIntProp(msg,msg,"subscriptionIdentifier");
}
if ( msg.hasOwnProperty("payload")) {
let topicOK = msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "");
@ -714,7 +763,12 @@ module.exports = function(RED) {
msg.topic = "";//must be empty string?
}
if (topicOK) { // topic must exist
this.brokerConn.publish(msg, done); // send the message
debug(`MQTTOutNode:${node.id}: sending msg to ${msg.topic}`, msg);
this.brokerConn.publish(msg, function(){
let args = arguments;
let l = args.length;
done();
}); // send the message
} else {
node.warn(RED._("mqtt.errors.invalid-topic"));
done();

View File

@ -350,16 +350,37 @@
"retain": "Retain",
"clientid": "Client ID",
"port": "Port",
"keepalive": "Keep alive time (s)",
"keepalive": "Keepalive time(s)",
"cleansession": "Use clean session",
"use-tls": "Enable secure (SSL/TLS) connection",
"tls-config":"TLS Configuration",
"verify-server-cert":"Verify server certificate",
"compatmode": "Use legacy MQTT 3.1 support",
"protocolVersion": "Version",
"protocolVersion3": "MQTT Protocol v3.1 (legacy)",
"protocolVersion4": "MQTT Protocol v3.1.1",
"protocolVersion5": "MQTT Protocol v5"
"userProperties": "User Properties",
"subscriptionIdentifier": "Subscription ID",
"flags": "Flags",
"nl": "No Local",
"rap": "Retain as Published",
"rh": "Retain Handling",
"rh0": "0, Always send retained messages",
"rh1": "1, Send retained messages for new subscription",
"rh2": "2, Never send retained messages",
"responseTopic": "Response topic",
"contentType": "Content Type",
"correlationData": "Correlation Data",
"messageExpiryInterval": "Expiry (secs)",
"topicAlias": "Alias",
"payloadFormatIndicator": "Format",
"payloadFormatIndicatorFalse": "unspecified bytes (Default)",
"payloadFormatIndicatorTrue": "UTF-8 encoded payload",
"protocolVersion": "Protocol Version",
"protocolVersion3": "MQTT V3.1 (legacy)",
"protocolVersion4": "MQTT V3.1.1",
"protocolVersion5": "MQTT V5",
"sessionExpiryInterval": "Expiry(s)",
"topicAliasMaximum": "Alias Max",
"maximumPacketSize": "Max Packet Size",
"receiveMaximum": "Receive Max"
},
"sections-label":{
"birth-message": "Message sent on connection (birth message)",
@ -967,4 +988,4 @@
"unexpected" : "unexpected mode",
"no-parts" : "no parts property in message"
}
}
}