Update logging/metric system

This commit is contained in:
Nick O'Leary
2015-02-03 22:02:26 +00:00
parent 7d6ce1ec12
commit 0aaea1ec40
24 changed files with 245 additions and 209 deletions

View File

@@ -14,7 +14,6 @@
* limitations under the License.
**/
var util = require("util");
var when = require("when");
var clone = require("clone");
@@ -22,6 +21,7 @@ var typeRegistry = require("./registry");
var credentials = require("./credentials");
var redUtil = require("../util");
var events = require("../events");
var log = require("../log");
function getID() {
return (1+Math.random()*4294967295).toString(16);
@@ -35,10 +35,10 @@ function createNode(type,config) {
nn = new nt(clone(config));
}
catch (err) {
util.log("[red] "+type+" : "+err);
log.warn(type+" : "+err);
}
} else {
util.log("[red] unknown type: "+type);
log.warn("unknown type: "+type);
}
return nn;
}

View File

@@ -1,5 +1,5 @@
/**
* Copyright 2014 IBM Corp.
* Copyright 2014, 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -96,10 +96,10 @@ Node.prototype.send = function(msg) {
// A single message and a single wire on output 0
// TODO: pre-load flows.get calls - cannot do in constructor
// as not all nodes are defined at that point
if (!msg._messageUuid) {
msg._messageUuid = constructUniqueIdentifier();
if (!msg._id) {
msg._id = constructUniqueIdentifier();
}
this.metric("Node.prototype.send", msg);
this.metric("send",msg);
node = flows.get(this._wire);
if (node) {
node.receive(msg);
@@ -116,6 +116,8 @@ Node.prototype.send = function(msg) {
// any calls to node.receive
var sendEvents = [];
var sentMessageId = null;
// for each output of node eg. [msgs to output 0, msgs to output 1, ...]
for (var i = 0; i < numOutputs; i++) {
var wires = this.wires[i]; // wires leaving output i
@@ -132,20 +134,15 @@ Node.prototype.send = function(msg) {
if (node) {
// for each msg to send eg. [[m1, m2, ...], ...]
for (k = 0; k < msgs.length; k++) {
var m = msgs[k];
if (!sentMessageId) {
sentMessageId = m._id;
}
if (msgSent) {
var clonedmsg = redUtil.cloneMessage(msgs[k]);
// overwriting any previously written uuid because a cloned
// message is a different one
clonedmsg._messageUuid = constructUniqueIdentifier();
this.metric("Node.prototype.send",clonedmsg,msgs[k]._messageUuid);
var clonedmsg = redUtil.cloneMessage(m);
sendEvents.push({n:node,m:clonedmsg});
} else {
// first msg sent so don't clone
if (!msgs[k]._messageUuid) {
msgs[k]._messageUuid = constructUniqueIdentifier();
}
this.metric("Node.prototype.send", msgs[k]);
sendEvents.push({n:node,m:msgs[k]});
sendEvents.push({n:node,m:m});
msgSent = true;
}
}
@@ -154,9 +151,16 @@ Node.prototype.send = function(msg) {
}
}
}
if (!sentMessageId) {
sentMessageId = constructUniqueIdentifier();
}
this.metric("send",{_id:sentMessageId});
for (i=0;i<sendEvents.length;i++) {
var ev = sendEvents[i];
if (!ev.m._id) {
ev.m._id = sentMessageId;
}
ev.n.receive(ev.m);
}
};
@@ -165,10 +169,10 @@ Node.prototype.receive = function(msg) {
if (!msg) {
msg = {};
}
if (!msg._messageUuid) {
msg._messageUuid = constructUniqueIdentifier();
if (!msg._id) {
msg._id = constructUniqueIdentifier();
}
this.metric("Node.prototype.receive",msg);
this.metric("receive",msg);
this.emit("input", msg);
};
@@ -186,25 +190,24 @@ function log_helper(self, level, msg) {
}
Node.prototype.log = function(msg) {
log_helper(this, 'info', msg);
log_helper(this, Log.INFO, msg);
};
Node.prototype.warn = function(msg) {
log_helper(this, 'warn', msg);
log_helper(this, Log.WARN, msg);
};
Node.prototype.error = function(msg) {
log_helper(this, 'error', msg);
log_helper(this, Log.ERROR, msg);
};
Node.prototype.metric = function(eventname, msg, metricValue) {
var metrics = {};
metrics.level = "metric";
metrics.level = Log.METRIC;
metrics.nodeid = this.id;
metrics.event = eventname;
metrics.msguuid = msg._messageUuid;
metrics.event = "node."+this.type+"."+eventname;
metrics.msgid = msg._id;
metrics.metric = metricValue;
Log.log(metrics);
}

View File

@@ -1,5 +1,5 @@
/**
* Copyright 2014 IBM Corp.
* Copyright 2014, 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +14,10 @@
* limitations under the License.
**/
var util = require("util");
var when = require("when");
var log = require("../log");
var credentialCache = {};
var storage = null;
var credentialsDef = {};
@@ -75,7 +76,7 @@ module.exports = {
return storage.getCredentials().then(function (creds) {
credentialCache = creds;
}).otherwise(function (err) {
util.log("[red] Error loading credentials : " + err);
log.warn("Error loading credentials : " + err);
});
},
@@ -168,7 +169,7 @@ module.exports = {
var dashedType = nodeType.replace(/\s+/g, '-');
var definition = credentialsDef[dashedType];
if (!definition) {
util.log('Credential Type ' + nodeType + ' is not registered.');
log.warn('Credential Type ' + nodeType + ' is not registered.');
return;
}

View File

@@ -14,7 +14,6 @@
* limitations under the License.
**/
var util = require("util");
var clone = require("clone");
var when = require("when");
@@ -37,7 +36,7 @@ var activeConfigNodes = {};
events.on('type-registered',function(type) {
if (activeFlow) {
if (activeFlow.typeRegistered(type)) {
util.log("[red] Missing type registered: "+type);
log.info("Missing type registered: "+type);
}
}
});
@@ -58,7 +57,7 @@ var flowNodes = module.exports = {
flowNodes.startFlows();
});
}).otherwise(function(err) {
util.log("[red] Error loading flows : "+err);
log.warn("Error loading flows : "+err);
console.log(err.stack);
});
},
@@ -162,22 +161,22 @@ var flowNodes = module.exports = {
}
},
startFlows: function() {
util.log("[red] Starting flows");
log.info("Starting flows");
try {
activeFlow.start();
} catch(err) {
var missingTypes = activeFlow.getMissingTypes();
if (missingTypes.length > 0) {
util.log("[red] Waiting for missing types to be registered:");
log.info("Waiting for missing types to be registered:");
for (var i=0;i<missingTypes.length;i++) {
util.log("[red] - "+missingTypes[i]);
log.info(" - "+missingTypes[i]);
}
}
}
},
stopFlows: function() {
util.log("[red] Stopping flows");
log.info("Stopping flows");
return activeFlow.stop();
}
};

View File

@@ -1,5 +1,5 @@
/**
* Copyright 2013, 2014 IBM Corp.
* Copyright 2013, 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -109,6 +109,7 @@ module.exports = {
// Node registry
createNode: createNode,
getNode: flows.get,
eachNode: flows.eachNode,
addNode: registry.addNode,
removeNode: removeNode,

View File

@@ -1,5 +1,5 @@
/**
* Copyright 2014 IBM Corp.
* Copyright 2014, 2015 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.