From 518358d9dcc00091b8af193c292476fcd5e2601f Mon Sep 17 00:00:00 2001 From: dceejay Date: Sun, 10 May 2015 00:12:52 +0100 Subject: [PATCH] Websocket - add reconnect capability when running as a client. to close #643 Also adds node.status to nodes. --- nodes/core/io/22-websocket.js | 80 +++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/nodes/core/io/22-websocket.js b/nodes/core/io/22-websocket.js index b7f291ded..fe347dedd 100644 --- a/nodes/core/io/22-websocket.js +++ b/nodes/core/io/22-websocket.js @@ -16,14 +16,13 @@ module.exports = function(RED) { "use strict"; - var ws = require("ws"), - inspect = require("sys").inspect; + var ws = require("ws"); + var inspect = require("sys").inspect; // A node red node that sets up a local websocket server function WebSocketListenerNode(n) { // Create a RED node RED.nodes.createNode(this,n); - var node = this; // Store local copies of the node configuration (as defined in the .html) @@ -32,25 +31,41 @@ module.exports = function(RED) { node._inputNodes = []; // collection of nodes that want to receive events node._clients = {}; + // match absolute url + node.isServer = !/^ws{1,2}:\/\//i.test(node.path); + node.closing = false; + + function startconn() { // Connect to remote endpoint + var socket = new ws(node.path); + node.server = socket; // keep for closing + handleConnection(socket); + } function handleConnection(/*socket*/socket) { var id = (1+Math.random()*4294967295).toString(16); - node._clients[id] = socket; + if (node.isServer) { node._clients[id] = socket; node.emit('opened',Object.keys(node._clients).length); } + socket.on('open',function() { + if (!node.isServer) { node.emit('opened',''); } + }); socket.on('close',function() { - delete node._clients[id]; + if (node.isServer) { delete node._clients[id]; node.emit('opened',Object.keys(node._clients).length); } + else { node.emit('closed'); } + if (!node.closing && !node.isServer) { + node.tout = setTimeout(function(){ startconn(); }, 3000); // try to reconnect every 3 secs... bit fast ? + } }); socket.on('message',function(data,flags){ node.handleEvent(id,socket,'message',data,flags); }); socket.on('error', function(err) { - node.warn("An error occured on the ws connection: "+inspect(err)); + node.emit('erro'); + if (!node.closing && !node.isServer) { + node.tout = setTimeout(function(){ startconn(); }, 3000); // try to reconnect every 3 secs... bit fast ? + } }); } - // match absolute url - node.isServer = !/^ws{1,2}:\/\//i.test(node.path); - if(node.isServer) - { + if (node.isServer) { var path = RED.settings.httpNodeRoot || "/"; path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path); @@ -75,35 +90,39 @@ module.exports = function(RED) { node.server.on('connection', handleConnection); } - else - { - // Connect to remote endpoint - var socket = new ws(node.path); - node.server = socket; // keep for closing - handleConnection(socket); + else { + node.closing = false; + startconn(); // start outbound connection } node.on("close", function() { // Workaround https://github.com/einaros/ws/pull/253 // Remove listeners from RED.server - var listener = null; - for(var event in node._serverListeners) { - if (node._serverListeners.hasOwnProperty(event)) { - listener = node._serverListeners[event]; - if(typeof listener === "function"){ - RED.server.removeListener(event,listener); + if (node.isServer) { + var listener = null; + for (var event in node._serverListeners) { + if (node._serverListeners.hasOwnProperty(event)) { + listener = node._serverListeners[event]; + if (typeof listener === "function") { + RED.server.removeListener(event,listener); + } } } + node._serverListeners = {}; + node.server.close(); + node._inputNodes = []; + } + else { + node.closing = true; + node.server.close(); + if (node.tout) { clearTimeout(tout); } } - node._serverListeners = {}; - node.server.close(); - node._inputNodes = []; }); } RED.nodes.registerType("websocket-listener",WebSocketListenerNode); RED.nodes.registerType("websocket-client",WebSocketListenerNode); - WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){ + WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler) { this._inputNodes.push(handler); } @@ -122,13 +141,12 @@ module.exports = function(RED) { }; } msg._session = {type:"websocket",id:id}; - for (var i = 0; i < this._inputNodes.length; i++) { this._inputNodes[i].send(msg); } } - WebSocketListenerNode.prototype.broadcast = function(data){ + WebSocketListenerNode.prototype.broadcast = function(data) { try { if(this.isServer) { for (var i = 0; i < this.server.clients.length; i++) { @@ -165,6 +183,9 @@ module.exports = function(RED) { } else { this.error("Missing server configuration"); } + this.serverConfig.on('opened', function(n) { node.status({fill:"green",shape:"dot",text:"connected "+n}); }); + this.serverConfig.on('erro', function() { node.status({fill:"red",shape:"ring",text:"error"}); }); + this.serverConfig.on('closed', function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); }); } RED.nodes.registerType("websocket in",WebSocketInNode); @@ -176,6 +197,9 @@ module.exports = function(RED) { if (!this.serverConfig) { this.error("Missing server configuration"); } + this.serverConfig.on('opened', function(n) { node.status({fill:"green",shape:"dot",text:"connected "+n}); }); + this.serverConfig.on('erro', function() { node.status({fill:"red",shape:"ring",text:"error"}); }); + this.serverConfig.on('closed', function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); }); this.on("input", function(msg) { var payload; if (this.serverConfig.wholemsg) {