diff --git a/nodes/core/io/10-mqtt.html b/nodes/core/io/10-mqtt.html index da3c5e347..96ff22053 100644 --- a/nodes/core/io/10-mqtt.html +++ b/nodes/core/io/10-mqtt.html @@ -30,8 +30,9 @@ diff --git a/nodes/core/io/10-mqtt.js b/nodes/core/io/10-mqtt.js index af3d77e94..f729d286b 100644 --- a/nodes/core/io/10-mqtt.js +++ b/nodes/core/io/10-mqtt.js @@ -22,9 +22,54 @@ function MQTTBrokerNode(n) { RED.nodes.createNode(this,n); this.broker = n.broker; this.port = n.port; + this.clientid = n.clientid; + var credentials = RED.nodes.getCredentials(n.id); + if (credentials) { + this.username = credentials.user; + this.password = credentials.password; + } } RED.nodes.registerType("mqtt-broker",MQTTBrokerNode); +var querystring = require('querystring'); + +RED.app.get('/mqtt-broker/:id',function(req,res) { + var credentials = RED.nodes.getCredentials(req.params.id); + if (credentials) { + res.send(JSON.stringify({user:credentials.user,hasPassword:(credentials.password&&credentials.password!="")})); + } else { + res.send(JSON.stringify({})); + } +}); + +RED.app.delete('/mqtt-broker/:id',function(req,res) { + RED.nodes.deleteCredentials(req.params.id); + res.send(200); +}); + +RED.app.post('/mqtt-broker/:id',function(req,res) { + var body = ""; + req.on('data', function(chunk) { + body+=chunk; + }); + req.on('end', function(){ + var newCreds = querystring.parse(body); + var credentials = RED.nodes.getCredentials(req.params.id)||{}; + if (newCreds.user == null || newCreds.user == "") { + delete credentials.user; + } else { + credentials.user = newCreds.user; + } + if (newCreds.password == "") { + delete credentials.password; + } else { + credentials.password = newCreds.password||credentials.password; + } + RED.nodes.addCredentials(req.params.id,credentials); + res.send(200); + }); +}); + function MQTTInNode(n) { RED.nodes.createNode(this,n); @@ -32,7 +77,7 @@ function MQTTInNode(n) { this.broker = n.broker; this.brokerConfig = RED.nodes.getNode(this.broker); if (this.brokerConfig) { - this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port); + this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); var node = this; this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) { var msg = {topic:topic,payload:payload,qos:qos,retain:retain}; @@ -65,7 +110,7 @@ function MQTTOutNode(n) { this.brokerConfig = RED.nodes.getNode(this.broker); if (this.brokerConfig) { - this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port); + this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); this.on("input",function(msg) { if (msg != null) { if (this.topic) { diff --git a/nodes/core/io/22-websocket.js b/nodes/core/io/22-websocket.js index 09e258926..e565122d5 100644 --- a/nodes/core/io/22-websocket.js +++ b/nodes/core/io/22-websocket.js @@ -43,6 +43,8 @@ function WebSocketListenerNode(n) { } } + node._clients = {}; + RED.server.addListener('newListener',storeListener); // Create a WebSocket Server @@ -53,8 +55,14 @@ function WebSocketListenerNode(n) { RED.server.removeListener('newListener',storeListener); node.server.on('connection', function(socket){ + var id = (1+Math.random()*4294967295).toString(16); + node._clients[id] = socket; + + socket.on('close',function() { + delete node._clients[id]; + }); socket.on('message',function(data,flags){ - node.handleEvent(socket,'message',data,flags); + node.handleEvent(id,socket,'message',data,flags); }); }); @@ -80,9 +88,9 @@ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){ this._inputNodes.push(handler); } -WebSocketListenerNode.prototype.handleEvent = function(/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){ +WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){ for (var i = 0; i < this._inputNodes.length; i++) { - this._inputNodes[i].send({payload:data}); + this._inputNodes[i].send({session:{type:"websocket",id:id},payload:data}); }; } @@ -92,6 +100,13 @@ WebSocketListenerNode.prototype.broadcast = function(data){ }; } +WebSocketListenerNode.prototype.send = function(id,data){ + var session = this._clients[id]; + if (session) { + session.send(data); + } +} + function WebSocketInNode(n) { RED.nodes.createNode(this,n); this.server = n.server; @@ -114,11 +129,23 @@ function WebSocketOutNode(n) { this.error("Missing server configuration"); } this.on("input", function(msg) { - node.serverConfig.broadcast(msg.payload,function(error){ - if(!!error){ - node.warn("An error occurred while sending:" + inspect(error)); - } - }); + var payload = msg.payload; + if (Buffer.isBuffer(payload)) { + payload = payload.toString(); + } else if (typeof payload === "object") { + payload = JSON.stringify(payload); + } else if (typeof payload !== "string") { + payload = ""+payload; + } + if (msg.session && msg.session.type == "websocket") { + node.serverConfig.send(msg.session.id,payload); + } else { + node.serverConfig.broadcast(payload,function(error){ + if(!!error){ + node.warn("An error occurred while sending:" + inspect(error)); + } + }); + } }); } RED.nodes.registerType("websocket out",WebSocketOutNode); diff --git a/nodes/core/io/lib/mqttConnectionPool.js b/nodes/core/io/lib/mqttConnectionPool.js index 1e4b681d4..6b31dea65 100644 --- a/nodes/core/io/lib/mqttConnectionPool.js +++ b/nodes/core/io/lib/mqttConnectionPool.js @@ -25,13 +25,16 @@ function matchTopic(ts,t) { } module.exports = { - get: function(broker,port) { - var id = broker+":"+port; + get: function(broker,port,clientid,username,password) { + var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port; if (!connections[id]) { connections[id] = function() { var client = mqtt.createClient(port,broker); client.setMaxListeners(0); - var options = {keepalive:15,clientId:'mqtt_' + (1+Math.random()*4294967295).toString(16)}; + var options = {keepalive:15}; + options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16); + options.username = username; + options.password = password; var queue = []; var subscriptions = []; var connecting = false; diff --git a/nodes/core/logic/10-switch.html b/nodes/core/logic/10-switch.html index bc59f7c94..e717de868 100644 --- a/nodes/core/logic/10-switch.html +++ b/nodes/core/logic/10-switch.html @@ -60,7 +60,7 @@ outputs: 1, icon: "switch.png", label: function() { - return this.name; + return this.name||"switch"; }, oneditprepare: function() { diff --git a/nodes/core/logic/15-change.html b/nodes/core/logic/15-change.html new file mode 100644 index 000000000..ea360908f --- /dev/null +++ b/nodes/core/logic/15-change.html @@ -0,0 +1,111 @@ + + + + + + + diff --git a/nodes/core/logic/15-change.js b/nodes/core/logic/15-change.js new file mode 100644 index 000000000..4866239c0 --- /dev/null +++ b/nodes/core/logic/15-change.js @@ -0,0 +1,66 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +var RED = require(process.env.NODE_RED_HOME + "/red/red"); + +function ChangeNode(n) { + RED.nodes.createNode(this, n); + this.action = n.action; + this.property = n.property || ""; + this.from = n.from || " "; + this.to = n.to || " "; + var node = this; + + var makeNew = function( stem, path, value ) { + var lastPart = (arguments.length === 3) ? path.pop() : false; + for (var i = 0; i < path.length; i++) { + stem = stem[path[i]] = stem[path[i]] || {}; + } + if (lastPart) { stem = stem[lastPart] = value; } + return stem; + }; + + this.on('input', function (msg) { + if (node.action == "change") { + node.re = new RegExp(this.from, "g"); + if (typeof msg[node.property] === "string") { + msg[node.property] = (msg[node.property]).replace(node.re, node.to); + } + } + //else if (node.action == "replace") { + //if (node.to.indexOf("msg.") == 0) { + //msg[node.property] = eval(node.to); + //} + //else { + //msg[node.property] = node.to; + //} + //} + else if (node.action == "replace") { + if (node.to.indexOf("msg.") == 0) { + makeNew( msg, node.property.split("."), eval(node.to) ); + } + else { + makeNew( msg, node.property.split("."), node.to ); + } + //makeNew( msg, node.property.split("."), node.to ); + } + else if (node.action == "delete") { + delete(msg[node.property]); + } + node.send(msg); + }); +} +RED.nodes.registerType("change", ChangeNode); diff --git a/nodes/core/social/27-twitter.html b/nodes/core/social/27-twitter.html index 2515a3541..66619c5ea 100644 --- a/nodes/core/social/27-twitter.html +++ b/nodes/core/social/27-twitter.html @@ -129,11 +129,19 @@