diff --git a/nodes/core/io/10-mqtt.js b/nodes/core/io/10-mqtt.js index c8bc49015..4e65bd39b 100644 --- a/nodes/core/io/10-mqtt.js +++ b/nodes/core/io/10-mqtt.js @@ -17,6 +17,7 @@ module.exports = function(RED) { "use strict"; var connectionPool = require("./lib/mqttConnectionPool"); + var isUtf8 = require('is-utf8'); function MQTTBrokerNode(n) { RED.nodes.createNode(this,n); @@ -45,11 +46,12 @@ module.exports = function(RED) { this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); var node = this; this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) { - var msg = {topic:topic,payload:payload,qos:qos,retain:retain}; - if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) { - msg._topic = topic; - } - node.send(msg); + if (isUtf8(payload)) { payload = payload.toString(); } + var msg = {topic:topic,payload:payload,qos:qos,retain:retain}; + if ((node.brokerConfig.broker === "localhost")||(node.brokerConfig.broker === "127.0.0.1")) { + msg._topic = topic; + } + node.send(msg); }); this.client.on("connectionlost",function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); @@ -78,7 +80,7 @@ module.exports = function(RED) { this.brokerConfig = RED.nodes.getNode(this.broker); if (this.brokerConfig) { - this.status({fill:"red",shape:"ring",text:"disconnected"},true); + this.status({fill:"red",shape:"ring",text:"disconnected"}); this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); var node = this; this.on("input",function(msg) { diff --git a/nodes/core/io/lib/mqtt.js b/nodes/core/io/lib/mqtt.js index 141a88893..5f35e85de 100644 --- a/nodes/core/io/lib/mqtt.js +++ b/nodes/core/io/lib/mqtt.js @@ -19,7 +19,7 @@ var events = require("events"); //var inspect = require("sys").inspect; //var Client = module.exports.Client = function( - + var port = 1883; var host = "localhost"; @@ -32,7 +32,7 @@ function MQTTClient(port,host) { this.lastOutbound = (new Date()).getTime(); this.lastInbound = (new Date()).getTime(); this.connected = false; - + this._nextMessageId = function() { this.messageId += 1; if (this.messageId > 0xFFFF) { @@ -53,7 +53,7 @@ MQTTClient.prototype.connect = function(options) { self.options.clean = self.options.clean||true; self.options.protocolId = 'MQIsdp'; self.options.protocolVersion = 3; - + self.client = mqtt.createConnection(this.port,this.host,function(err,client) { if (err) { self.connected = false; @@ -87,9 +87,9 @@ MQTTClient.prototype.connect = function(options) { if (packet.returnCode == 0) { self.watchdog = setInterval(function(self) { var now = (new Date()).getTime(); - + //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); - + if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { if (self.pingOutstanding) { //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); @@ -105,7 +105,7 @@ MQTTClient.prototype.connect = function(options) { self.client.pingreq(); } } - + },self.options.keepalive*500,self); self.pingOutstanding = false; self.lastInbound = (new Date()).getTime() @@ -131,7 +131,7 @@ MQTTClient.prototype.connect = function(options) { delete self.pendingSubscriptions[packet.messageId]; }); client.on('publish',function(packet) { - self.lastInbound = (new Date()).getTime() + self.lastInbound = (new Date()).getTime(); if (packet.qos < 2) { var p = packet; self.emit('message',p.topic,p.payload,p.qos,p.retain); @@ -145,7 +145,7 @@ MQTTClient.prototype.connect = function(options) { self.client.puback(packet); } }); - + client.on('pubrel',function(packet) { self.lastInbound = (new Date()).getTime() var p = self.inboundMessages[packet.messageId]; @@ -156,12 +156,12 @@ MQTTClient.prototype.connect = function(options) { self.lastOutbound = (new Date()).getTime() self.client.pubcomp(packet); }); - + client.on('puback',function(packet) { self.lastInbound = (new Date()).getTime() // outbound qos-1 complete }); - + client.on('pubrec',function(packet) { self.lastInbound = (new Date()).getTime() self.lastOutbound = (new Date()).getTime() @@ -176,7 +176,7 @@ MQTTClient.prototype.connect = function(options) { self.lastInbound = (new Date()).getTime() self.pingOutstanding = false; }); - + this.lastOutbound = (new Date()).getTime() this.connectionError = false; client.connect(self.options); @@ -192,8 +192,9 @@ MQTTClient.prototype.subscribe = function(topic,qos) { messageId: self._nextMessageId() }; this.pendingSubscriptions[options.messageId] = topic; - this.lastOutbound = (new Date()).getTime() + this.lastOutbound = (new Date()).getTime(); self.client.subscribe(options); + self.client.setPacketEncoding('binary'); } } MQTTClient.prototype.unsubscribe = function(topic) { @@ -212,7 +213,7 @@ MQTTClient.prototype.unsubscribe = function(topic) { MQTTClient.prototype.publish = function(topic,payload,qos,retain) { var self = this; if (self.connected) { - + if (!Buffer.isBuffer(payload)) { if (typeof payload === "object") { payload = JSON.stringify(payload); @@ -251,4 +252,3 @@ module.exports.createClient = function(port,host) { var mqtt_client = new MQTTClient(port,host); return mqtt_client; } -