1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Websocket - add reconnect capability when running as a client.

to close #643

Also adds node.status to nodes.
This commit is contained in:
dceejay 2015-05-10 00:12:52 +01:00
parent 5ffde21d83
commit 518358d9dc

View File

@ -16,14 +16,13 @@
module.exports = function(RED) { module.exports = function(RED) {
"use strict"; "use strict";
var ws = require("ws"), var ws = require("ws");
inspect = require("sys").inspect; var inspect = require("sys").inspect;
// A node red node that sets up a local websocket server // A node red node that sets up a local websocket server
function WebSocketListenerNode(n) { function WebSocketListenerNode(n) {
// Create a RED node // Create a RED node
RED.nodes.createNode(this,n); RED.nodes.createNode(this,n);
var node = this; var node = this;
// Store local copies of the node configuration (as defined in the .html) // Store local copies of the node configuration (as defined in the .html)
@ -32,25 +31,41 @@ module.exports = function(RED) {
node._inputNodes = []; // collection of nodes that want to receive events node._inputNodes = []; // collection of nodes that want to receive events
node._clients = {}; node._clients = {};
// match absolute url
node.isServer = !/^ws{1,2}:\/\//i.test(node.path);
node.closing = false;
function startconn() { // Connect to remote endpoint
var socket = new ws(node.path);
node.server = socket; // keep for closing
handleConnection(socket);
}
function handleConnection(/*socket*/socket) { function handleConnection(/*socket*/socket) {
var id = (1+Math.random()*4294967295).toString(16); var id = (1+Math.random()*4294967295).toString(16);
node._clients[id] = socket; if (node.isServer) { node._clients[id] = socket; node.emit('opened',Object.keys(node._clients).length); }
socket.on('open',function() {
if (!node.isServer) { node.emit('opened',''); }
});
socket.on('close',function() { socket.on('close',function() {
delete node._clients[id]; if (node.isServer) { delete node._clients[id]; node.emit('opened',Object.keys(node._clients).length); }
else { node.emit('closed'); }
if (!node.closing && !node.isServer) {
node.tout = setTimeout(function(){ startconn(); }, 3000); // try to reconnect every 3 secs... bit fast ?
}
}); });
socket.on('message',function(data,flags){ socket.on('message',function(data,flags){
node.handleEvent(id,socket,'message',data,flags); node.handleEvent(id,socket,'message',data,flags);
}); });
socket.on('error', function(err) { socket.on('error', function(err) {
node.warn("An error occured on the ws connection: "+inspect(err)); node.emit('erro');
if (!node.closing && !node.isServer) {
node.tout = setTimeout(function(){ startconn(); }, 3000); // try to reconnect every 3 secs... bit fast ?
}
}); });
} }
// match absolute url if (node.isServer) {
node.isServer = !/^ws{1,2}:\/\//i.test(node.path);
if(node.isServer)
{
var path = RED.settings.httpNodeRoot || "/"; var path = RED.settings.httpNodeRoot || "/";
path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path); path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);
@ -75,35 +90,39 @@ module.exports = function(RED) {
node.server.on('connection', handleConnection); node.server.on('connection', handleConnection);
} }
else else {
{ node.closing = false;
// Connect to remote endpoint startconn(); // start outbound connection
var socket = new ws(node.path);
node.server = socket; // keep for closing
handleConnection(socket);
} }
node.on("close", function() { node.on("close", function() {
// Workaround https://github.com/einaros/ws/pull/253 // Workaround https://github.com/einaros/ws/pull/253
// Remove listeners from RED.server // Remove listeners from RED.server
var listener = null; if (node.isServer) {
for(var event in node._serverListeners) { var listener = null;
if (node._serverListeners.hasOwnProperty(event)) { for (var event in node._serverListeners) {
listener = node._serverListeners[event]; if (node._serverListeners.hasOwnProperty(event)) {
if(typeof listener === "function"){ listener = node._serverListeners[event];
RED.server.removeListener(event,listener); if (typeof listener === "function") {
RED.server.removeListener(event,listener);
}
} }
} }
node._serverListeners = {};
node.server.close();
node._inputNodes = [];
}
else {
node.closing = true;
node.server.close();
if (node.tout) { clearTimeout(tout); }
} }
node._serverListeners = {};
node.server.close();
node._inputNodes = [];
}); });
} }
RED.nodes.registerType("websocket-listener",WebSocketListenerNode); RED.nodes.registerType("websocket-listener",WebSocketListenerNode);
RED.nodes.registerType("websocket-client",WebSocketListenerNode); RED.nodes.registerType("websocket-client",WebSocketListenerNode);
WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler) {
this._inputNodes.push(handler); this._inputNodes.push(handler);
} }
@ -122,13 +141,12 @@ module.exports = function(RED) {
}; };
} }
msg._session = {type:"websocket",id:id}; msg._session = {type:"websocket",id:id};
for (var i = 0; i < this._inputNodes.length; i++) { for (var i = 0; i < this._inputNodes.length; i++) {
this._inputNodes[i].send(msg); this._inputNodes[i].send(msg);
} }
} }
WebSocketListenerNode.prototype.broadcast = function(data){ WebSocketListenerNode.prototype.broadcast = function(data) {
try { try {
if(this.isServer) { if(this.isServer) {
for (var i = 0; i < this.server.clients.length; i++) { for (var i = 0; i < this.server.clients.length; i++) {
@ -165,6 +183,9 @@ module.exports = function(RED) {
} else { } else {
this.error("Missing server configuration"); this.error("Missing server configuration");
} }
this.serverConfig.on('opened', function(n) { node.status({fill:"green",shape:"dot",text:"connected "+n}); });
this.serverConfig.on('erro', function() { node.status({fill:"red",shape:"ring",text:"error"}); });
this.serverConfig.on('closed', function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); });
} }
RED.nodes.registerType("websocket in",WebSocketInNode); RED.nodes.registerType("websocket in",WebSocketInNode);
@ -176,6 +197,9 @@ module.exports = function(RED) {
if (!this.serverConfig) { if (!this.serverConfig) {
this.error("Missing server configuration"); this.error("Missing server configuration");
} }
this.serverConfig.on('opened', function(n) { node.status({fill:"green",shape:"dot",text:"connected "+n}); });
this.serverConfig.on('erro', function() { node.status({fill:"red",shape:"ring",text:"error"}); });
this.serverConfig.on('closed', function() { node.status({fill:"red",shape:"ring",text:"disconnected"}); });
this.on("input", function(msg) { this.on("input", function(msg) {
var payload; var payload;
if (this.serverConfig.wholemsg) { if (this.serverConfig.wholemsg) {