1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Add comms module

This commit is contained in:
Nick O'Leary 2014-05-07 20:47:25 +01:00
parent e6794a0c75
commit 16f8673ec0
7 changed files with 204 additions and 133 deletions

View File

@ -91,7 +91,7 @@
} }
}); });
var a = function() { (function() {
var content = document.createElement("div"); var content = document.createElement("div");
content.id = "tab-debug"; content.id = "tab-debug";
@ -114,84 +114,56 @@
var sbc = document.getElementById("debug-content"); var sbc = document.getElementById("debug-content");
var errornotification = null;
var messageCount = 0; var messageCount = 0;
var handleDebugMessage = function(t,o) {
function debugConnect() { var msg = document.createElement("div");
//console.log("debug ws connecting"); msg.onmouseover = function() {
var path = location.hostname+":"+location.port+document.location.pathname; msg.style.borderRightColor = "#999";
path = path+(path.slice(-1) == "/"?"":"/")+"debug/ws"; var n = RED.nodes.node(o.id);
path = "ws"+(document.location.protocol=="https:"?"s":"")+"://"+path; if (n) {
var ws = new WebSocket(path); n.highlighted = true;
ws.onopen = function() { n.dirty = true;
if (errornotification) {
errornotification.close();
errornotification = null;
}
//console.log("debug ws connected");
}
ws.onmessage = function(event) {
var o = JSON.parse(event.data);
if (o.heartbeat) {
return;
}
//console.log(msg);
var msg = document.createElement("div");
msg.onmouseover = function() {
msg.style.borderRightColor = "#999";
var n = RED.nodes.node(o.id);
if (n) {
n.highlighted = true;
n.dirty = true;
}
RED.view.redraw();
};
msg.onmouseout = function() {
msg.style.borderRightColor = "";
var n = RED.nodes.node(o.id);
if (n) {
n.highlighted = false;
n.dirty = true;
}
RED.view.redraw();
};
msg.onclick = function() {
var node = RED.nodes.node(o.id);
if (node) {
RED.view.showWorkspace(node.z);
}
};
var name = (o.name?o.name:o.id).toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
var topic = (o.topic||"").toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
var payload = (o.msg||"").toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
msg.className = 'debug-message'+(o.level?(' debug-message-level-'+o.level):'')
msg.innerHTML = '<span class="debug-message-date">'+getTimestamp()+'</span>'+
'<span class="debug-message-name">['+name+']</span>'+
(o.topic?'<span class="debug-message-topic">'+topic+'</span>':'')+
'<span class="debug-message-payload">'+payload+'</span>';
var atBottom = (sbc.scrollHeight-messages.offsetHeight-sbc.scrollTop) < 5;
messageCount++;
$(messages).append(msg);
if (messageCount > 200) {
$("#debug-content .debug-message:first").remove();
messageCount--;
}
if (atBottom) {
$(sbc).scrollTop(sbc.scrollHeight);
} }
RED.view.redraw();
}; };
ws.onclose = function() { msg.onmouseout = function() {
if (errornotification == null) { msg.style.borderRightColor = "";
errornotification = RED.notify("<b>Error</b>: Lost connection to server","error",true); var n = RED.nodes.node(o.id);
if (n) {
n.highlighted = false;
n.dirty = true;
} }
setTimeout(debugConnect,1000); RED.view.redraw();
};
msg.onclick = function() {
var node = RED.nodes.node(o.id);
if (node) {
RED.view.showWorkspace(node.z);
}
};
var name = (o.name?o.name:o.id).toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
var topic = (o.topic||"").toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
var payload = (o.msg||"").toString().replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
msg.className = 'debug-message'+(o.level?(' debug-message-level-'+o.level):'')
msg.innerHTML = '<span class="debug-message-date">'+getTimestamp()+'</span>'+
'<span class="debug-message-name">['+name+']</span>'+
(o.topic?'<span class="debug-message-topic">'+topic+'</span>':'')+
'<span class="debug-message-payload">'+payload+'</span>';
var atBottom = (sbc.scrollHeight-messages.offsetHeight-sbc.scrollTop) < 5;
messageCount++;
$(messages).append(msg);
if (messageCount > 200) {
$("#debug-content .debug-message:first").remove();
messageCount--;
} }
} if (atBottom) {
debugConnect(); $(sbc).scrollTop(sbc.scrollHeight);
}
};
RED.comms.subscribe("debug",handleDebugMessage);
$("#debug-tab-clear").click(function() { $("#debug-tab-clear").click(function() {
$(".debug-message").remove(); $(".debug-message").remove();
@ -203,7 +175,7 @@
RED.view.redraw(); RED.view.redraw();
}); });
}(); })();
</script> </script>
<style> <style>

View File

@ -51,35 +51,15 @@ module.exports = function(RED) {
if (typeof msg.payload == "undefined") { msg.payload = "(undefined)"; } if (typeof msg.payload == "undefined") { msg.payload = "(undefined)"; }
if (msg.payload instanceof Buffer) { msg.payload = "(Buffer) "+msg.payload.toString('hex'); } if (msg.payload instanceof Buffer) { msg.payload = "(Buffer) "+msg.payload.toString('hex'); }
if (this.active) { if (this.active) {
DebugNode.send({id:this.id,name:this.name,topic:msg.topic,msg:msg.payload,_path:msg._path}); sendDebug({id:this.id,name:this.name,topic:msg.topic,msg:msg.payload,_path:msg._path});
} }
} }
}); });
} }
var lastSentTime = (new Date()).getTime();
setInterval(function() {
var now = (new Date()).getTime();
if (now-lastSentTime > 15000) {
lastSentTime = now;
for (var i in DebugNode.activeConnections) {
var ws = DebugNode.activeConnections[i];
try {
var p = JSON.stringify({heartbeat:lastSentTime});
ws.send(p);
} catch(err) {
util.log("[debug] ws heartbeat error : "+err);
}
}
}
}, 15000);
RED.nodes.registerType("debug",DebugNode); RED.nodes.registerType("debug",DebugNode);
DebugNode.send = function(msg) { function sendDebug(msg) {
if (msg.msg instanceof Error) { if (msg.msg instanceof Error) {
msg.msg = msg.msg.toString(); msg.msg = msg.msg.toString();
} else if (typeof msg.msg === 'object') { } else if (typeof msg.msg === 'object') {
@ -106,47 +86,13 @@ module.exports = function(RED) {
msg.msg = msg.msg.substr(0,debuglength) +" ...."; msg.msg = msg.msg.substr(0,debuglength) +" ....";
} }
for (var i in DebugNode.activeConnections) { RED.comms.publish("debug",msg);
var ws = DebugNode.activeConnections[i];
try {
var p = JSON.stringify(msg);
ws.send(p);
} catch(err) {
util.log("[debug] ws error : "+err);
}
}
lastSentTime = (new Date()).getTime();
} }
DebugNode.activeConnections = [];
var path = RED.settings.httpAdminRoot || "/";
path = path + (path.slice(-1) == "/" ? "":"/") + "debug/ws";
DebugNode.wsServer = new ws.Server({server:RED.server,path:path});
DebugNode.wsServer.on('connection',function(ws) {
DebugNode.activeConnections.push(ws);
ws.on('close',function() {
for (var i in DebugNode.activeConnections) {
if (DebugNode.activeConnections[i] === ws) {
DebugNode.activeConnections.splice(i,1);
break;
}
}
});
ws.on('error', function(err) {
util.log("[debug] ws error : "+err);
});
});
DebugNode.wsServer.on('error', function(err) {
util.log("[debug] ws server error : "+err);
});
DebugNode.logHandler = new events.EventEmitter(); DebugNode.logHandler = new events.EventEmitter();
DebugNode.logHandler.on("log",function(msg) { DebugNode.logHandler.on("log",function(msg) {
if (msg.level == "warn" || msg.level == "error") { if (msg.level == "warn" || msg.level == "error") {
DebugNode.send(msg); sendDebug(msg);
} }
}); });
RED.log.addHandler(DebugNode.logHandler); RED.log.addHandler(DebugNode.logHandler);

View File

@ -317,5 +317,6 @@
<script src="red/ui/editor.js"></script> <script src="red/ui/editor.js"></script>
<script src="red/ui/library.js"></script> <script src="red/ui/library.js"></script>
<script src="red/ui/notifications.js"></script> <script src="red/ui/notifications.js"></script>
<script src="red/comms.js"></script>
</body> </body>
</html> </html>

61
public/red/comms.js Normal file
View File

@ -0,0 +1,61 @@
/**
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
RED.comms = function() {
var errornotification = null;
var subscriptions = {};
function connectWS() {
var path = location.hostname+":"+location.port+document.location.pathname;
path = path+(path.slice(-1) == "/"?"":"/")+"comms";
path = "ws"+(document.location.protocol=="https:"?"s":"")+"://"+path;
var ws = new WebSocket(path);
ws.onopen = function() {
if (errornotification) {
errornotification.close();
errornotification = null;
}
}
ws.onmessage = function(event) {
var msg = JSON.parse(event.data);
var subscribers = subscriptions[msg.topic];
if (subscribers) {
for (var i=0;i<subscribers.length;i++) {
subscribers[i](msg.topic,msg.data);
}
}
};
ws.onclose = function() {
if (errornotification == null) {
errornotification = RED.notify("<b>Error</b>: Lost connection to server","error",true);
}
setTimeout(connectWS,1000);
}
}
connectWS();
function subscribe(topic,callback) {
if (subscriptions[topic] == null) {
subscriptions[topic] = [];
}
subscriptions[topic].push(callback);
}
return {
subscribe: subscribe
}
}();

86
red/comms.js Normal file
View File

@ -0,0 +1,86 @@
/**
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
var ws = require("ws");
var util = require("util");
var server;
var settings;
var wsServer;
var activeConnections = [];
var heartbeatTimer;
var lastSentTime;
function init(_server,_settings) {
server = _server;
settings = _settings;
}
function start() {
var path = settings.httpAdminRoot || "/";
path = path + (path.slice(-1) == "/" ? "":"/") + "comms";
wsServer = new ws.Server({server:server,path:path});
wsServer.on('connection',function(ws) {
activeConnections.push(ws);
ws.on('close',function() {
for (var i=0;i<activeConnections.length;i++) {
if (activeConnections[i] === ws) {
activeConnections.splice(i,1);
break;
}
}
});
ws.on('error', function(err) {
util.log("[red:comms] error : "+err.toString());
});
});
wsServer.on('error', function(err) {
util.log("[red:comms] server error : "+err.toString());
});
lastSentTime = Date.now();
heartbeatTimer = setInterval(function() {
var now = Date.now();
if (now-lastSentTime > 15000) {
lastSentTime = now;
publish("hb",lastSentTime);
}
}, 15000);
}
function publish(topic,data) {
var msg = JSON.stringify({topic:topic,data:data});
activeConnections.forEach(function(conn) {
try {
conn.send(msg);
} catch(err) {
util.log("[red:comms] send error : "+err.toString());
}
});
}
module.exports = {
init:init,
start:start,
publish:publish,
}

View File

@ -18,6 +18,7 @@ var events = require("./events");
var server = require("./server"); var server = require("./server");
var nodes = require("./nodes"); var nodes = require("./nodes");
var library = require("./library"); var library = require("./library");
var comms = require("./comms");
var log = require("./log"); var log = require("./log");
var fs = require("fs"); var fs = require("fs");
var settings = null; var settings = null;
@ -50,6 +51,7 @@ var RED = {
stop: server.stop, stop: server.stop,
nodes: nodes, nodes: nodes,
library: library, library: library,
comms: comms,
events: events, events: events,
log: log log: log
}; };

View File

@ -20,6 +20,7 @@ var when = require('when');
var createUI = require("./ui"); var createUI = require("./ui");
var redNodes = require("./nodes"); var redNodes = require("./nodes");
var comms = require("./comms");
var app = null; var app = null;
var nodeApp = null; var nodeApp = null;
@ -30,6 +31,7 @@ var storage = null;
function createServer(_server,_settings) { function createServer(_server,_settings) {
server = _server; server = _server;
settings = _settings; settings = _settings;
comms.init(_server,_settings);
storage = require("./storage"); storage = require("./storage");
app = createUI(settings); app = createUI(settings);
nodeApp = express(); nodeApp = express();
@ -87,6 +89,7 @@ function start() {
redNodes.loadFlows(); redNodes.loadFlows();
}); });
comms.start();
}); });
return defer.promise; return defer.promise;