Update MQTT node to use MQTT.js 1.2 and enable secure connections

This commit is contained in:
Richard Ruston 2015-06-03 08:54:43 +02:00 committed by Nick O'Leary
parent 4ed09f6431
commit 437b2d506b
3 changed files with 314 additions and 35 deletions

View File

@ -1,12 +1,9 @@
<!--
Copyright 2013,2014 IBM Corp.
Copyright 2013,2015 IBM Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -33,6 +30,8 @@
<p>MQTT input node. Connects to a broker and subscribes to the specified topic. The topic may contain MQTT wildcards.</p>
<p>Outputs an object called <b>msg</b> containing <b>msg.topic, msg.payload, msg.qos</b> and <b>msg.retain</b>.</p>
<p><b>msg.payload</b> is usually a string, but can also be a binary buffer.</p>
<p>If a secure connection is being used, certificate checking can be disabled to allow connections to brokers using a self signed or non-trusted CA certificate.</p>
<p>Compatibility mode allows connections to brokers which do not support the MQTT V3.1.1 standard.</p>
</script>
<script type="text/javascript">
@ -90,6 +89,9 @@
<p>Connects to a MQTT broker and publishes <b>msg.payload</b> either to the <b>msg.topic</b> or to the topic specified in the edit window. The value in the edit window has precedence.</p>
<p>Likewise QoS and/or retain values in the edit panel will overwrite any <b>msg.qos</b> and <b>msg.retain</b> properties. If nothing is set they default to <i>0</i> and <i>false</i> respectively.</p>
<p>If <b>msg.payload</b> contains an object it will be stringified before being sent.</p>
<p>If a secure connection is being used, certificate checking can be disabled to allow connections to brokers using a self signed or non-trusted CA certificate.</p>
<p>Compatibility mode allows connections to brokers which do not support the MQTT V3.1.1 standard.</p>
</script>
<script type="text/javascript">
@ -124,8 +126,23 @@
<input type="text" id="node-config-input-port" data-i18n="[placeholder]mqtt.label.port" style="width:45px">
</div>
<div class="form-row">
<label for="node-config-input-clientid"><i class="fa fa-tag"></i> <span data-i18n="mqtt.label.clientid"></span></label>
<input type="text" id="node-config-input-clientid" data-i18n="[placeholder]mqtt.placeholder.clientid">
<label>&nbsp;</label>
<input type="checkbox" id="node-config-input-secureconn" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-config-input-secureconn" style="width: 70%;">Enable secure (SSL/TLS) connection</label>
</div>
<div class="form-row" id="row-disablecertauth">
<label>&nbsp;</label>
<input type="checkbox" id="node-config-input-disablecertauth" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-config-input-disablecertauth" style="width: 70%;">Disable certificate authentication (secure connections only)</label>
</div>
<div class="form-row">
<label>&nbsp;</label>
<input type="checkbox" id="node-config-input-compatmode" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-config-input-compatmode" style="width: 70%;">Compatibility for brokers not supporting MQTT v3.1.1</label>
</div>
<div class="form-row">
<label for="node-config-input-clientid"><i class="fa fa-tag"></i> Client ID</label>
<input type="text" id="node-config-input-clientid" placeholder="Leave blank for auto generated">
</div>
<div class="form-row">
<label for="node-config-input-user"><i class="fa fa-user"></i> <span data-i18n="common.label.username"></span></label>
@ -143,7 +160,11 @@
defaults: {
broker: {value:"",required:true},
port: {value:1883,required:true,validate:RED.validators.number()},
clientid: { value:"" }
clientid: { value:"" },
secureconn: {value: false},
disablecertauth: { value: false},
compatmode: { value: true},
mqttkeepalive: {value:15}
},
credentials: {
user: {type:"text"},
@ -152,6 +173,35 @@
label: function() {
if (this.broker == "") { this.broker = "localhost"; }
return (this.clientid?this.clientid+"@":"")+this.broker+":"+this.port;
},
oneditprepare: function () {
if (this.broker && typeof this.secureconn === 'undefined'){
this.secureconn = false;
}
if (this.broker && typeof this.disablecertauth === 'undefined'){
this.disablecertauth = false;
}
if (this.broker && typeof this.compatmode === 'undefined'){
this.compatmode = true;
$("#node-config-input-compatmode").prop('checked', true);
}
if (this.broker && typeof this.mqttkeepalive === 'undefined'){
this.mqttkeepalive = 15;
}
if (this.secureconn == true) {
$("#row-disablecertauth").show();
} else {
$("#row-disablecertauth").hide();
}
$("#node-config-input-secureconn").on("click",function() {
if($(this).is(':checked')) {
$("#row-disablecertauth").show();
} else {
$("#row-disablecertauth").hide();
}
});
}
});
</script>

View File

@ -1,5 +1,5 @@
/**
* Copyright 2013,2014 IBM Corp.
* Copyright 2013,2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,19 +16,250 @@
module.exports = function(RED) {
"use strict";
var connectionPool = require("./lib/mqttConnectionPool");
var mqtt = require("mqtt");
var util = require("util");
var events = require("events");
var isUtf8 = require('is-utf8');
function matchTopic(ts,t) {
if (ts == "#") {
return true;
}
var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
return re.test(t);
}
function MQTTBrokerNode(n) {
RED.nodes.createNode(this,n);
// Configuration options passed by Node Red
this.broker = n.broker;
this.port = n.port;
this.clientid = n.clientid;
this.secureconn = n.secureconn;
this.disablecertauth = n.disablecertauth;
this.compatmode = n.compatmode;
this.mqttkeepalive = n.mqttkeepalive;
// Config node state
this.brokerurl = "";
this.connected = false;
this.connecting = false;
this.usecount = 0;
this.logdisconnect = true;
this.options = {};
this.queue = [];
this.subscriptions = {};
events.EventEmitter.call(this);
this.setMaxListeners(0);
if (this.credentials) {
this.username = this.credentials.user;
this.password = this.credentials.password;
}
// If the config node is missing certain options (it was probably deployed prior to an update to the node code),
// select/generate sensible options for the new fields
if (typeof this.secureconn === 'undefined'){
this.secureconn = false;
}
if (typeof this.compatmode === 'undefined'){
this.compatmode = true;
}
if (typeof this.disablecertauth === 'undefined'){
this.disablecertauth = false;
}
if (typeof this.mqttkeepalive === 'undefined'){
this.mqttkeepalive = 15;
}
// Create the URL to pass in to the MQTT.js library
if (this.brokerurl == "") {
if (this.secureconn) {
this.brokerurl="mqtts://";
} else {
this.brokerurl="mqtt://";
}
if (this.broker != "") {
this.brokerurl = this.brokerurl+this.broker+":"+this.port;
} else {
this.brokerurl = this.brokerurl+"localhost:1883";
}
}
// Build options for passing to the MQTT.js API
this.options.clientId = this.clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
this.options.username = this.username;
this.options.password = this.password;
this.options.keepalive = this.mqttkeepalive;
this.options.reconnectPeriod = 5000;
if (this.compatmode == "true" || this.compatmode === true){
this.log('Using compatibility mode for non-MQTT v3.1.1 brokers');
this.options.protocolId = 'MQIsdp';
this.options.protocolVersion = 3;
}
if (this.disablecertauth == "true" || this.disablecertauth === true) {
this.log(' Warning: Certificate checking is disabled for this connection');
this.options.rejectUnauthorized = false;
} else {
this.options.rejectUnauthorized = true;
}
// Define functions called by MQTT in and out nodes
var node = this;
this.register = function(){
node.usecount += 1;
};
this.deregister = function(){
node.usecount -= 1;
if (node.usecount == 0) {
node.client.end();
}
};
this.connect = function () {
if (!node.connected && !node.connecting) {
node.connecting = true;
node.client = mqtt.connect(node.brokerurl ,node.options);
// Register successful connect or reconnect handler
node.client.on('connect', function () {
node.connected = true;
node.logdisconnect = true;
node.log("Connected to broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
node.emit('connected');
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
node.client.removeAllListeners('message');
// Re-subscribe to stored topics
for (var s in node.subscriptions) {
var topic = s;
var qos = 0;
for (var r in node.subscriptions[s]) {
qos = Math.max(qos,node.subscriptions[s][r].qos);
node.client.on('message',node.subscriptions[s][r].handler);
}
var options = {qos: qos};
node.client.subscribe(topic, options);
}
// Send any queued messages
while(node.queue.length) {
var msg = node.queue.shift();
//console.log(msg);
node.publish(msg);
}
});
// Register disconnect handlers
node.client.on('close', function () {
if (node.connected && node.logdisconnect ) {
node.connected = false;
node.logdisconnect = false;
node.log("Disconnected from broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
node.emit('disconnected');
}
});
node.client.on('offline', function () {
if (node.connected && node.logdisconnect ) {
node.connected = false;
node.logdisconnect = false;
node.log("Disconnected from broker: "+(node.clientid?node.clientid+"@":"")+node.brokerurl);
node.emit('disconnected');
}
});
// Register connect error handler
node.client.on('error', function (error) {
node.log("" + error);
if (node.connecting) {
node.client.end();
node.connecting = false;
}
});
}
};
this.subscribe = function (topic,qos,callback,ref) {
ref = ref||0;
node.subscriptions[topic] = node.subscriptions[topic]||{};
var sub = {
topic:topic,
qos:qos,
handler:function(mtopic,mpayload, mpacket) {
if (matchTopic(topic,mtopic)) {
callback(mtopic,mpayload, mpacket);
}
},
ref: ref
};
node.subscriptions[topic][ref] = sub;
if (node.connected) {
node.client.on('message',sub.handler);
var options = {};
options.qos = qos;
node.client.subscribe(topic, options);
}
};
this.unsubscribe = function (topic, ref) {
ref = ref||0;
var sub = node.subscriptions[topic];
if (sub) {
if (sub[ref]) {
node.client.removeListener('message',sub[ref].handler);
delete sub[ref];
}
if (Object.keys(sub).length == 0) {
delete node.subscriptions[topic];
if (node.connected){
node.client.unsubscribe(topic);
}
}
}
};
this.publish = function (msg) {
if (node.connected) {
if (!Buffer.isBuffer(msg.payload)) {
if (typeof msg.payload === "object") {
msg.payload = JSON.stringify(msg.payload);
} else if (typeof msg.payload !== "string") {
msg.payload = "" + msg.payload;
}
}
var options = {
qos: msg.qos || 0,
retain: msg.retain || false
};
node.client.publish(msg.topic, msg.payload, options, function (err){return});
} else {
if (!node.connecting) {
node.connect();
}
node.queue.push(msg);
}
};
this.on('close', function(closecomplete) {
if (this.connected) {
this.on('disconnected', function() {
closecomplete();
});
this.client.end();
} else {
closecomplete();
}
});
}
util.inherits(MQTTBrokerNode, events.EventEmitter);
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
credentials: {
user: {type:"text"},
@ -40,30 +271,30 @@ module.exports = function(RED) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
this.brokerConn = RED.nodes.getNode(this.broker);
if (this.brokerConn) {
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
var node = this;
node.brokerConn.register();
if (this.topic) {
this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) {
this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) {
if (isUtf8(payload)) { payload = payload.toString(); }
var msg = {topic:topic,payload:payload,qos:qos,retain:retain};
if ((node.brokerConfig.broker === "localhost")||(node.brokerConfig.broker === "127.0.0.1")) {
var msg = {topic:topic,payload:payload, qos: packet.qos, retain: packet.retain};
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic;
}
node.send(msg);
}, this.id);
this.client.on("connectionlost",function() {
this.brokerConn.on("disconnected",function() {
node.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
});
this.client.on("connect",function() {
this.brokerConn.on("connected",function() {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
});
if (this.client.isConnected()) {
if (this.brokerConn.connected) {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
} else {
this.client.connect();
this.brokerConn.connect();
}
}
else {
@ -73,9 +304,9 @@ module.exports = function(RED) {
this.error(RED._("mqtt.errors.missing-config"));
}
this.on('close', function() {
if (this.client) {
this.client.unsubscribe(this.topic,this.id);
this.client.disconnect();
if (this.brokerConn) {
this.brokerConn.unsubscribe(this.topic,this.id);
node.brokerConn.deregister();
}
});
}
@ -87,12 +318,12 @@ module.exports = function(RED) {
this.qos = n.qos || null;
this.retain = n.retain;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
this.brokerConn = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
if (this.brokerConn) {
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
var node = this;
node.brokerConn.register();
this.on("input",function(msg) {
if (msg.qos) {
msg.qos = parseInt(msg.qos);
@ -108,30 +339,28 @@ module.exports = function(RED) {
}
if ( msg.hasOwnProperty("payload")) {
if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
this.client.publish(msg); // send the message
this.brokerConn.publish(msg); // send the message
}
else { node.warn(RED._("mqtt.errors.invalid-topic")); }
}
});
this.client.on("connectionlost",function() {
this.brokerConn.on("disconnected",function() {
node.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
});
this.client.on("connect",function() {
this.brokerConn.on("connected",function() {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
});
if (this.client.isConnected()) {
if (this.brokerConn.connected) {
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
} else {
this.client.connect();
this.brokerConn.connect();
}
} else {
this.error(RED._("mqtt.errors.missing-config"));
}
this.on('close', function() {
if (this.client) {
this.client.disconnect();
}
node.brokerConn.deregister();
});
}
RED.nodes.registerType("mqtt out",MQTTOutNode);
}
};

View File

@ -29,7 +29,7 @@
"when": "3.7.3",
"bcryptjs": "2.2.0",
"nopt": "3.0.3",
"mqtt": "0.3.x",
"mqtt": "1.1.x",
"ws": "0.7.2",
"fs-extra": "0.22.1",
"clone": "1.0.2",