Flow debugger initial pass

This commit is contained in:
Nick O'Leary
2016-11-01 13:10:51 +00:00
parent cebddc0237
commit 416d5190bc
6 changed files with 335 additions and 14 deletions

View File

@@ -18,25 +18,55 @@ var util = require("util");
var flows = require("../flows");
var redUtil = require("../../util");
var redDebugger = require("../debugger");
var runtime;
var routes = {};
var sendQueue = [];
var paused = false;
function init(_runtime) {
runtime = _runtime;
wires = {};
}
function pause() {
paused = true;
}
function resume() {
paused = false;
setImmediate(processSendEvent);
}
function add(sourceNode, wires) {
routes[sourceNode.id] = wires;
}
function remove(sourceNode) {
delete routes[sourceNode.id];
}
function processSendEvent() {
if (!paused) {
if (sendQueue.length > 0) {
var sendEvent = sendQueue.shift();
//console.log(ev.sourceNode.id+"["+ev.sourcePort+"] -> "+ev.destinationNode.id+"["+ev.destinationPort+"] : "+redDebugger.checkSendEvent(ev));
if (!sendEvent.triggered && redDebugger.checkSendEvent(sendEvent)) {
sendEvent.triggered = true;
sendQueue.unshift(sendEvent);
pause();
} else {
sendEvent.destinationNode.receive(sendEvent.msg);
}
}
if (!paused && sendQueue.length > 0) {
setImmediate(processSendEvent);
}
}
}
function send(sourceNode, msg) {
if (msg === null || typeof msg === "undefined") {
return;
@@ -73,13 +103,20 @@ function send(sourceNode, msg) {
if (!sentMessageId) {
sentMessageId = m._msgid;
}
var sendEvent = {
sourceNode: sourceNode,
sourcePort:i,
destinationNode:node,
destinationPort:0
}
if (msgSent) {
var clonedmsg = redUtil.cloneMessage(m);
sendEvents.push({n:node,m:clonedmsg});
sendEvent.msg = redUtil.cloneMessage(m);
} else {
sendEvents.push({n:node,m:m});
sendEvent.msg = m;
msgSent = true;
}
sendEvents.push(sendEvent);
}
}
}
@@ -91,16 +128,16 @@ function send(sourceNode, msg) {
if (!sentMessageId) {
sentMessageId = redUtil.generateId();
}
sourceNode.metric("send",{_msgid:sentMessageId});
for (i=0;i<sendEvents.length;i++) {
var ev = sendEvents[i];
/* istanbul ignore else */
if (!ev.m._msgid) {
ev.m._msgid = sentMessageId;
if (!ev.msg._msgid) {
ev.msg._msgid = sentMessageId;
}
ev.n.receive(ev.m);
sendQueue.push(ev);
}
sourceNode.metric("send",{_msgid:sentMessageId});
processSendEvent();
}
}
@@ -108,5 +145,7 @@ module.exports = {
init:init,
add:add,
remove: remove,
send:send
send:send,
pause:pause,
resume:resume
}