2013-09-05 16:02:48 +02:00
|
|
|
/**
|
2015-06-03 08:54:43 +02:00
|
|
|
* Copyright 2013,2015 IBM Corp.
|
2013-09-05 16:02:48 +02:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
2014-05-04 00:32:04 +02:00
|
|
|
module.exports = function(RED) {
|
2014-05-29 10:00:28 +02:00
|
|
|
"use strict";
|
2015-06-03 08:54:43 +02:00
|
|
|
var mqtt = require("mqtt");
|
|
|
|
var util = require("util");
|
2014-11-04 22:56:15 +01:00
|
|
|
var isUtf8 = require('is-utf8');
|
2014-05-29 10:00:28 +02:00
|
|
|
|
2015-06-03 08:54:43 +02:00
|
|
|
function matchTopic(ts,t) {
|
|
|
|
if (ts == "#") {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
|
|
|
|
return re.test(t);
|
|
|
|
}
|
|
|
|
|
2014-05-04 00:32:04 +02:00
|
|
|
function MQTTBrokerNode(n) {
|
|
|
|
RED.nodes.createNode(this,n);
|
2015-06-03 08:54:43 +02:00
|
|
|
|
|
|
|
// Configuration options passed by Node Red
|
2014-05-04 00:32:04 +02:00
|
|
|
this.broker = n.broker;
|
|
|
|
this.port = n.port;
|
|
|
|
this.clientid = n.clientid;
|
2015-09-01 23:30:15 +02:00
|
|
|
this.usetls = n.usetls;
|
|
|
|
this.verifyservercert = n.verifyservercert;
|
2015-06-03 08:54:43 +02:00
|
|
|
this.compatmode = n.compatmode;
|
2015-09-01 23:30:15 +02:00
|
|
|
this.keepalive = n.keepalive;
|
|
|
|
this.cleansession = n.cleansession;
|
2015-06-03 08:54:43 +02:00
|
|
|
|
|
|
|
// Config node state
|
|
|
|
this.brokerurl = "";
|
|
|
|
this.connected = false;
|
|
|
|
this.connecting = false;
|
|
|
|
this.options = {};
|
|
|
|
this.queue = [];
|
|
|
|
this.subscriptions = {};
|
|
|
|
|
2015-09-02 12:18:59 +02:00
|
|
|
if (n.birthTopic) {
|
|
|
|
this.birthMessage = {
|
|
|
|
topic: n.birthTopic,
|
|
|
|
payload: n.birthPayload || "",
|
|
|
|
qos: Number(n.birthQos||0),
|
|
|
|
retain: n.birthRetain=="true"|| n.birthRetain===true
|
|
|
|
};
|
|
|
|
}
|
2015-06-03 08:54:43 +02:00
|
|
|
|
2014-07-18 15:47:58 +02:00
|
|
|
if (this.credentials) {
|
|
|
|
this.username = this.credentials.user;
|
|
|
|
this.password = this.credentials.password;
|
2014-05-29 10:00:28 +02:00
|
|
|
}
|
2015-06-03 08:54:43 +02:00
|
|
|
|
|
|
|
// If the config node is missing certain options (it was probably deployed prior to an update to the node code),
|
|
|
|
// select/generate sensible options for the new fields
|
2015-09-01 23:30:15 +02:00
|
|
|
if (typeof this.usetls === 'undefined'){
|
|
|
|
this.usetls = false;
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
|
|
|
if (typeof this.compatmode === 'undefined'){
|
|
|
|
this.compatmode = true;
|
|
|
|
}
|
2015-09-01 23:30:15 +02:00
|
|
|
if (typeof this.verifyservercert === 'undefined'){
|
|
|
|
this.verifyservercert = false;
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
2015-09-01 23:30:15 +02:00
|
|
|
if (typeof this.keepalive === 'undefined'){
|
|
|
|
this.keepalive = 15;
|
|
|
|
} else if (typeof this.keepalive === 'string') {
|
|
|
|
this.keepalive = Number(this.keepalive);
|
|
|
|
}
|
|
|
|
if (typeof this.cleansession === 'undefined') {
|
|
|
|
this.cleansession = true;
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create the URL to pass in to the MQTT.js library
|
|
|
|
if (this.brokerurl == "") {
|
2015-09-01 23:30:15 +02:00
|
|
|
if (this.usetls) {
|
2015-06-03 08:54:43 +02:00
|
|
|
this.brokerurl="mqtts://";
|
|
|
|
} else {
|
|
|
|
this.brokerurl="mqtt://";
|
|
|
|
}
|
|
|
|
if (this.broker != "") {
|
|
|
|
this.brokerurl = this.brokerurl+this.broker+":"+this.port;
|
|
|
|
} else {
|
|
|
|
this.brokerurl = this.brokerurl+"localhost:1883";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-01 23:30:15 +02:00
|
|
|
if (!this.cleansession && !this.clientid) {
|
|
|
|
this.cleansession = true;
|
|
|
|
this.warn(RED._("mqtt.errors.nonclean-missingclientid"));
|
|
|
|
}
|
|
|
|
|
2015-06-03 08:54:43 +02:00
|
|
|
// Build options for passing to the MQTT.js API
|
|
|
|
this.options.clientId = this.clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
|
|
|
|
this.options.username = this.username;
|
|
|
|
this.options.password = this.password;
|
2015-09-01 23:30:15 +02:00
|
|
|
this.options.keepalive = this.keepalive;
|
2015-09-01 23:58:26 +02:00
|
|
|
this.options.clean = this.cleansession;
|
2015-09-01 23:30:15 +02:00
|
|
|
this.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000;
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.compatmode == "true" || this.compatmode === true){
|
|
|
|
this.options.protocolId = 'MQIsdp';
|
|
|
|
this.options.protocolVersion = 3;
|
|
|
|
}
|
2015-09-01 23:30:15 +02:00
|
|
|
|
|
|
|
this.options.rejectUnauthorized = (this.verifyservercert == "true" || this.verifyservercert === true)
|
|
|
|
|
|
|
|
if (n.willTopic) {
|
|
|
|
this.options.will = {
|
|
|
|
topic: n.willTopic,
|
|
|
|
payload: n.willPayload || "",
|
|
|
|
qos: Number(n.willQos||0),
|
|
|
|
retain: n.willRetain=="true"|| n.willRetain===true
|
|
|
|
};
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Define functions called by MQTT in and out nodes
|
|
|
|
var node = this;
|
2015-11-17 23:19:56 +01:00
|
|
|
this.users = {};
|
|
|
|
|
|
|
|
this.register = function(mqttNode){
|
|
|
|
node.users[mqttNode.id] = mqttNode;
|
|
|
|
if (Object.keys(node.users).length === 1) {
|
|
|
|
node.connect();
|
|
|
|
}
|
2015-06-03 08:54:43 +02:00
|
|
|
};
|
|
|
|
|
2015-11-17 23:19:56 +01:00
|
|
|
this.deregister = function(mqttNode){
|
|
|
|
delete node.users[mqttNode.id];
|
|
|
|
if (Object.keys(node.users).length === 0) {
|
2015-06-03 08:54:43 +02:00
|
|
|
node.client.end();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
this.connect = function () {
|
|
|
|
if (!node.connected && !node.connecting) {
|
|
|
|
node.connecting = true;
|
|
|
|
node.client = mqtt.connect(node.brokerurl ,node.options);
|
2015-11-17 23:19:56 +01:00
|
|
|
node.client.setMaxListeners(0);
|
2015-06-03 08:54:43 +02:00
|
|
|
// Register successful connect or reconnect handler
|
|
|
|
node.client.on('connect', function () {
|
2015-11-17 23:19:56 +01:00
|
|
|
node.connecting = false;
|
2015-06-03 08:54:43 +02:00
|
|
|
node.connected = true;
|
2015-09-01 23:30:15 +02:00
|
|
|
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
2015-11-17 23:19:56 +01:00
|
|
|
for (var id in node.users) {
|
|
|
|
if (node.users.hasOwnProperty(id)) {
|
|
|
|
node.users[id].status({fill:"green",shape:"dot",text:"common.status.connected"});
|
|
|
|
}
|
|
|
|
}
|
2015-06-03 08:54:43 +02:00
|
|
|
// Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
|
|
|
|
node.client.removeAllListeners('message');
|
|
|
|
|
|
|
|
// Re-subscribe to stored topics
|
|
|
|
for (var s in node.subscriptions) {
|
|
|
|
var topic = s;
|
|
|
|
var qos = 0;
|
|
|
|
for (var r in node.subscriptions[s]) {
|
|
|
|
qos = Math.max(qos,node.subscriptions[s][r].qos);
|
|
|
|
node.client.on('message',node.subscriptions[s][r].handler);
|
|
|
|
}
|
|
|
|
var options = {qos: qos};
|
|
|
|
node.client.subscribe(topic, options);
|
|
|
|
}
|
|
|
|
|
2015-09-02 12:18:59 +02:00
|
|
|
// Send any birth message
|
|
|
|
if (node.birthMessage) {
|
|
|
|
node.publish(node.birthMessage);
|
|
|
|
}
|
|
|
|
|
2015-06-03 08:54:43 +02:00
|
|
|
// Send any queued messages
|
|
|
|
while(node.queue.length) {
|
|
|
|
var msg = node.queue.shift();
|
|
|
|
//console.log(msg);
|
|
|
|
node.publish(msg);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// Register disconnect handlers
|
|
|
|
node.client.on('close', function () {
|
2015-09-01 23:30:15 +02:00
|
|
|
if (node.connected) {
|
2015-06-03 08:54:43 +02:00
|
|
|
node.connected = false;
|
2015-09-01 23:30:15 +02:00
|
|
|
node.log(RED._("mqtt.state.disconnected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
2015-11-17 23:19:56 +01:00
|
|
|
for (var id in node.users) {
|
|
|
|
if (node.users.hasOwnProperty(id)) {
|
|
|
|
node.users[id].status({fill:"red",shape:"ring",text:"common.status.disconnected"});
|
|
|
|
}
|
|
|
|
}
|
2015-09-01 23:30:15 +02:00
|
|
|
} else if (node.connecting) {
|
|
|
|
node.log(RED._("mqtt.state.connect-failed",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// Register connect error handler
|
|
|
|
node.client.on('error', function (error) {
|
|
|
|
if (node.connecting) {
|
|
|
|
node.client.end();
|
|
|
|
node.connecting = false;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
this.subscribe = function (topic,qos,callback,ref) {
|
|
|
|
ref = ref||0;
|
|
|
|
node.subscriptions[topic] = node.subscriptions[topic]||{};
|
|
|
|
var sub = {
|
|
|
|
topic:topic,
|
|
|
|
qos:qos,
|
|
|
|
handler:function(mtopic,mpayload, mpacket) {
|
|
|
|
if (matchTopic(topic,mtopic)) {
|
|
|
|
callback(mtopic,mpayload, mpacket);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
ref: ref
|
|
|
|
};
|
|
|
|
node.subscriptions[topic][ref] = sub;
|
|
|
|
if (node.connected) {
|
|
|
|
node.client.on('message',sub.handler);
|
|
|
|
var options = {};
|
|
|
|
options.qos = qos;
|
|
|
|
node.client.subscribe(topic, options);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
this.unsubscribe = function (topic, ref) {
|
|
|
|
ref = ref||0;
|
|
|
|
var sub = node.subscriptions[topic];
|
|
|
|
if (sub) {
|
|
|
|
if (sub[ref]) {
|
|
|
|
node.client.removeListener('message',sub[ref].handler);
|
|
|
|
delete sub[ref];
|
|
|
|
}
|
|
|
|
if (Object.keys(sub).length == 0) {
|
|
|
|
delete node.subscriptions[topic];
|
|
|
|
if (node.connected){
|
|
|
|
node.client.unsubscribe(topic);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
this.publish = function (msg) {
|
|
|
|
if (node.connected) {
|
|
|
|
if (!Buffer.isBuffer(msg.payload)) {
|
|
|
|
if (typeof msg.payload === "object") {
|
|
|
|
msg.payload = JSON.stringify(msg.payload);
|
|
|
|
} else if (typeof msg.payload !== "string") {
|
|
|
|
msg.payload = "" + msg.payload;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var options = {
|
|
|
|
qos: msg.qos || 0,
|
|
|
|
retain: msg.retain || false
|
|
|
|
};
|
|
|
|
node.client.publish(msg.topic, msg.payload, options, function (err){return});
|
|
|
|
} else {
|
|
|
|
if (!node.connecting) {
|
|
|
|
node.connect();
|
|
|
|
}
|
|
|
|
node.queue.push(msg);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2015-11-17 23:19:56 +01:00
|
|
|
this.on('close', function(done) {
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.connected) {
|
2015-11-17 23:19:56 +01:00
|
|
|
this.client.on('close', function() {
|
|
|
|
done();
|
2015-06-03 08:54:43 +02:00
|
|
|
});
|
|
|
|
this.client.end();
|
|
|
|
} else {
|
2015-11-17 23:19:56 +01:00
|
|
|
done();
|
2015-06-03 08:54:43 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2013-11-25 23:50:08 +01:00
|
|
|
}
|
2015-06-03 08:54:43 +02:00
|
|
|
|
2014-07-18 15:47:58 +02:00
|
|
|
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
|
|
|
|
credentials: {
|
|
|
|
user: {type:"text"},
|
|
|
|
password: {type: "password"}
|
2013-11-25 23:50:08 +01:00
|
|
|
}
|
2014-05-04 00:32:04 +02:00
|
|
|
});
|
2014-05-29 10:00:28 +02:00
|
|
|
|
2014-05-04 00:32:04 +02:00
|
|
|
function MQTTInNode(n) {
|
|
|
|
RED.nodes.createNode(this,n);
|
|
|
|
this.topic = n.topic;
|
|
|
|
this.broker = n.broker;
|
2015-06-03 08:54:43 +02:00
|
|
|
this.brokerConn = RED.nodes.getNode(this.broker);
|
2015-11-17 23:19:56 +01:00
|
|
|
var node = this;
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.brokerConn) {
|
2015-07-02 00:02:50 +02:00
|
|
|
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
|
2015-03-16 18:07:46 +01:00
|
|
|
if (this.topic) {
|
2015-06-03 08:54:43 +02:00
|
|
|
this.brokerConn.subscribe(this.topic,2,function(topic,payload,packet) {
|
2015-03-16 18:07:46 +01:00
|
|
|
if (isUtf8(payload)) { payload = payload.toString(); }
|
2015-06-03 08:54:43 +02:00
|
|
|
var msg = {topic:topic,payload:payload, qos: packet.qos, retain: packet.retain};
|
|
|
|
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
|
2015-03-16 18:07:46 +01:00
|
|
|
msg._topic = topic;
|
|
|
|
}
|
|
|
|
node.send(msg);
|
2015-04-09 21:10:34 +02:00
|
|
|
}, this.id);
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.brokerConn.connected) {
|
2015-07-02 00:02:50 +02:00
|
|
|
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
|
2015-04-09 21:10:34 +02:00
|
|
|
}
|
2015-11-17 23:19:56 +01:00
|
|
|
node.brokerConn.register(this);
|
2015-03-16 18:07:46 +01:00
|
|
|
}
|
|
|
|
else {
|
2015-05-10 22:47:22 +02:00
|
|
|
this.error(RED._("mqtt.errors.not-defined"));
|
2015-03-16 18:07:46 +01:00
|
|
|
}
|
2015-11-17 23:19:56 +01:00
|
|
|
this.on('close', function() {
|
|
|
|
if (node.brokerConn) {
|
|
|
|
node.brokerConn.unsubscribe(node.topic,node.id);
|
|
|
|
node.brokerConn.deregister(node);
|
|
|
|
}
|
|
|
|
});
|
2014-05-04 00:32:04 +02:00
|
|
|
} else {
|
2015-05-10 22:47:22 +02:00
|
|
|
this.error(RED._("mqtt.errors.missing-config"));
|
2014-05-04 00:32:04 +02:00
|
|
|
}
|
2013-09-05 16:02:48 +02:00
|
|
|
}
|
2014-05-04 00:32:04 +02:00
|
|
|
RED.nodes.registerType("mqtt in",MQTTInNode);
|
2014-05-29 10:00:28 +02:00
|
|
|
|
2014-05-04 00:32:04 +02:00
|
|
|
function MQTTOutNode(n) {
|
|
|
|
RED.nodes.createNode(this,n);
|
|
|
|
this.topic = n.topic;
|
2014-08-22 13:42:07 +02:00
|
|
|
this.qos = n.qos || null;
|
|
|
|
this.retain = n.retain;
|
2014-05-04 00:32:04 +02:00
|
|
|
this.broker = n.broker;
|
2015-06-03 08:54:43 +02:00
|
|
|
this.brokerConn = RED.nodes.getNode(this.broker);
|
2015-11-17 23:19:56 +01:00
|
|
|
var node = this;
|
2014-05-29 10:00:28 +02:00
|
|
|
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.brokerConn) {
|
2015-07-02 00:02:50 +02:00
|
|
|
this.status({fill:"red",shape:"ring",text:"common.status.disconnected"});
|
2014-05-04 00:32:04 +02:00
|
|
|
this.on("input",function(msg) {
|
2014-09-08 22:10:06 +02:00
|
|
|
if (msg.qos) {
|
|
|
|
msg.qos = parseInt(msg.qos);
|
|
|
|
if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) {
|
|
|
|
msg.qos = null;
|
2014-08-22 13:42:07 +02:00
|
|
|
}
|
2013-09-05 16:02:48 +02:00
|
|
|
}
|
2014-09-08 22:10:06 +02:00
|
|
|
msg.qos = Number(node.qos || msg.qos || 0);
|
|
|
|
msg.retain = node.retain || msg.retain || false;
|
|
|
|
msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
|
|
|
|
if (node.topic) {
|
|
|
|
msg.topic = node.topic;
|
|
|
|
}
|
2015-04-14 19:39:42 +02:00
|
|
|
if ( msg.hasOwnProperty("payload")) {
|
|
|
|
if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
|
2015-06-03 08:54:43 +02:00
|
|
|
this.brokerConn.publish(msg); // send the message
|
2015-04-14 19:39:42 +02:00
|
|
|
}
|
2015-05-10 22:47:22 +02:00
|
|
|
else { node.warn(RED._("mqtt.errors.invalid-topic")); }
|
2014-09-08 22:10:06 +02:00
|
|
|
}
|
2014-05-04 00:32:04 +02:00
|
|
|
});
|
2015-06-03 08:54:43 +02:00
|
|
|
if (this.brokerConn.connected) {
|
2015-07-02 00:02:50 +02:00
|
|
|
node.status({fill:"green",shape:"dot",text:"common.status.connected"});
|
2015-04-27 21:08:00 +02:00
|
|
|
}
|
2015-11-17 23:19:56 +01:00
|
|
|
node.brokerConn.register(node);
|
|
|
|
this.on('close', function() {
|
|
|
|
node.brokerConn.deregister(node);
|
|
|
|
});
|
2014-05-04 00:32:04 +02:00
|
|
|
} else {
|
2015-05-10 22:47:22 +02:00
|
|
|
this.error(RED._("mqtt.errors.missing-config"));
|
2014-05-04 00:32:04 +02:00
|
|
|
}
|
2013-09-05 16:02:48 +02:00
|
|
|
}
|
2014-05-04 00:32:04 +02:00
|
|
|
RED.nodes.registerType("mqtt out",MQTTOutNode);
|
2015-06-03 08:54:43 +02:00
|
|
|
};
|