diff --git a/nodes/core/io/22-websocket.js b/nodes/core/io/22-websocket.js index 09e258926..e565122d5 100644 --- a/nodes/core/io/22-websocket.js +++ b/nodes/core/io/22-websocket.js @@ -43,6 +43,8 @@ function WebSocketListenerNode(n) { } } + node._clients = {}; + RED.server.addListener('newListener',storeListener); // Create a WebSocket Server @@ -53,8 +55,14 @@ function WebSocketListenerNode(n) { RED.server.removeListener('newListener',storeListener); node.server.on('connection', function(socket){ + var id = (1+Math.random()*4294967295).toString(16); + node._clients[id] = socket; + + socket.on('close',function() { + delete node._clients[id]; + }); socket.on('message',function(data,flags){ - node.handleEvent(socket,'message',data,flags); + node.handleEvent(id,socket,'message',data,flags); }); }); @@ -80,9 +88,9 @@ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){ this._inputNodes.push(handler); } -WebSocketListenerNode.prototype.handleEvent = function(/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){ +WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){ for (var i = 0; i < this._inputNodes.length; i++) { - this._inputNodes[i].send({payload:data}); + this._inputNodes[i].send({session:{type:"websocket",id:id},payload:data}); }; } @@ -92,6 +100,13 @@ WebSocketListenerNode.prototype.broadcast = function(data){ }; } +WebSocketListenerNode.prototype.send = function(id,data){ + var session = this._clients[id]; + if (session) { + session.send(data); + } +} + function WebSocketInNode(n) { RED.nodes.createNode(this,n); this.server = n.server; @@ -114,11 +129,23 @@ function WebSocketOutNode(n) { this.error("Missing server configuration"); } this.on("input", function(msg) { - node.serverConfig.broadcast(msg.payload,function(error){ - if(!!error){ - node.warn("An error occurred while sending:" + inspect(error)); - } - }); + var payload = msg.payload; + if (Buffer.isBuffer(payload)) { + payload = payload.toString(); + } else if (typeof payload === "object") { + payload = JSON.stringify(payload); + } else if (typeof payload !== "string") { + payload = ""+payload; + } + if (msg.session && msg.session.type == "websocket") { + node.serverConfig.send(msg.session.id,payload); + } else { + node.serverConfig.broadcast(payload,function(error){ + if(!!error){ + node.warn("An error occurred while sending:" + inspect(error)); + } + }); + } }); } RED.nodes.registerType("websocket out",WebSocketOutNode);