1
0
mirror of https://github.com/node-red/node-red.git synced 2023-10-10 13:36:53 +02:00

Merge pull request #1813 from node-red-hitachi/0.19-jsonata-persistablecontext

Add context store support to JSONata functions
This commit is contained in:
Nick O'Leary 2018-07-17 20:34:53 +01:00 committed by GitHub
commit b0a01fa4b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 512 additions and 34 deletions

View File

@ -408,9 +408,9 @@ function evaluateJSONataExpression(expr,msg,callback) {
if (callback) {
// If callback provided, need to override the pre-assigned sync
// context functions to be their async variants
bindings.flowContext = function(val) {
bindings.flowContext = function(val, store) {
return new Promise((resolve,reject) => {
expr._node.context().flow.get(val, function(err,value) {
expr._node.context().flow.get(val, store, function(err,value) {
if (err) {
reject(err);
} else {
@ -419,9 +419,9 @@ function evaluateJSONataExpression(expr,msg,callback) {
})
});
}
bindings.globalContext = function(val) {
bindings.globalContext = function(val, store) {
return new Promise((resolve,reject) => {
expr._node.context().global.get(val, function(err,value) {
expr._node.context().global.get(val, store, function(err,value) {
if (err) {
reject(err);
} else {

View File

@ -19,6 +19,7 @@ var should = require("should");
var switchNode = require("../../../../nodes/core/logic/10-switch.js");
var helper = require("node-red-node-test-helper");
var RED = require("../../../../red/red.js");
var Context = require("../../../../red/runtime/nodes/context");
describe('switch Node', function() {
@ -26,10 +27,24 @@ describe('switch Node', function() {
helper.startServer(done);
});
function initContext(done) {
Context.init({
contextStorage: {
memory: {
module: "memory"
}
}
});
Context.load().then(function () {
done();
});
}
afterEach(function(done) {
helper.unload();
RED.settings.nodeMessageBufferMaxLength = 0;
helper.stopServer(done);
helper.unload().then(function(){
RED.settings.nodeMessageBufferMaxLength = 0;
helper.stopServer(done);
});
});
it('should be loaded with some defaults', function(done) {
@ -653,6 +668,53 @@ describe('switch Node', function() {
customFlowSwitchTest(flow, true, -5, done);
});
it('should handle flow and global contexts with JSONata expression', function(done) {
var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"$abs($flowContext(\"payload\"))",propertyType:"jsonata",rules:[{"t":"btwn","v":"$flowContext(\"vt\")","vt":"jsonata","v2":"$globalContext(\"v2t\")","v2t":"jsonata"}],checkall:true,outputs:1,wires:[["helperNode1"]],z:"flow"},
{id:"helperNode1", type:"helper", wires:[],z:"flow"},
{id:"flow",type:"tab"}];
helper.load(switchNode, flow, function() {
var switchNode1 = helper.getNode("switchNode1");
var helperNode1 = helper.getNode("helperNode1");
switchNode1.context().flow.set("payload",-5);
switchNode1.context().flow.set("vt",4);
switchNode1.context().global.set("v2t",6);
helperNode1.on("input", function(msg) {
try {
should.equal(msg.payload,"pass");
done();
} catch(err) {
done(err);
}
});
switchNode1.receive({payload:"pass"});
});
});
it('should handle persistable flow and global contexts with JSONata expression', function(done) {
var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"$abs($flowContext(\"payload\",\"memory\"))",propertyType:"jsonata",rules:[{"t":"btwn","v":"$flowContext(\"vt\",\"memory\")","vt":"jsonata","v2":"$globalContext(\"v2t\",\"memory\")","v2t":"jsonata"}],checkall:true,outputs:1,wires:[["helperNode1"]],z:"flow"},
{id:"helperNode1", type:"helper", wires:[],z:"flow"},
{id:"flow",type:"tab"}];
helper.load(switchNode, flow, function() {
initContext(function () {
var switchNode1 = helper.getNode("switchNode1");
var helperNode1 = helper.getNode("helperNode1");
switchNode1.context().flow.set(["payload","vt"],[-7,6],"memory",function(){
switchNode1.context().global.set("v2t",8,"memory",function(){
helperNode1.on("input", function(msg) {
try {
should.equal(msg.payload,"pass");
done();
} catch(err) {
done(err);
}
});
switchNode1.receive({payload:"pass"});
});
});
});
});
});
it('should take head of message sequence (no repair)', function(done) {
var flow = [{id:"switchNode1",type:"switch",name:"switchNode",property:"payload",rules:[{"t":"head","v":3}],checkall:false,repair:false,outputs:1,wires:[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];

View File

@ -560,6 +560,102 @@ describe('change Node', function() {
});
});
it('changes the value using flow context with jsonata', function(done) {
var flow = [{"id":"changeNode1","type":"change",rules:[{"t":"set","p":"payload","to":"$flowContext(\"foo\")","tot":"jsonata"}],"name":"changeNode","wires":[["helperNode1"]],"z":"flow"},
{id:"helperNode1", type:"helper", wires:[],"z":"flow"},{"id":"flow","type":"tab"}];
helper.load(changeNode, flow, function() {
initContext(function () {
var changeNode1 = helper.getNode("changeNode1");
var helperNode1 = helper.getNode("helperNode1");
changeNode1.context().flow.set("foo","bar");
helperNode1.on("input", function(msg) {
try {
msg.payload.should.eql("bar");
done();
} catch(err) {
done(err);
}
});
changeNode1.receive({payload:"Hello World!"});
});
});
});
it('changes the value using global context with jsonata', function(done) {
var flow = [{"id":"changeNode1","type":"change",rules:[{"t":"set","p":"payload","to":"$globalContext(\"foo\")","tot":"jsonata"}],"name":"changeNode","wires":[["helperNode1"]],"z":"flow"},
{id:"helperNode1", type:"helper", wires:[],"z":"flow"},{"id":"flow","type":"tab"}];
helper.load(changeNode, flow, function() {
initContext(function () {
var changeNode1 = helper.getNode("changeNode1");
var helperNode1 = helper.getNode("helperNode1");
changeNode1.context().global.set("foo","bar");
helperNode1.on("input", function(msg) {
try {
msg.payload.should.eql("bar");
done();
} catch(err) {
done(err);
}
});
changeNode1.receive({payload:"Hello World!"});
});
});
});
it('changes the value using persistable flow context with jsonata', function(done) {
var flow = [{"id":"changeNode1","type":"change",rules:[{"t":"set","p":"payload","to":"$flowContext(\"foo\",\"memory1\")","tot":"jsonata"}],"name":"changeNode","wires":[["helperNode1"]],"z":"flow"},
{id:"helperNode1", type:"helper", wires:[],"z":"flow"},{"id":"flow","type":"tab"}];
helper.load(changeNode, flow, function() {
initContext(function () {
var changeNode1 = helper.getNode("changeNode1");
var helperNode1 = helper.getNode("helperNode1");
helperNode1.on("input", function(msg) {
try {
msg.payload.should.eql("bar");
done();
} catch(err) {
done(err);
}
});
changeNode1.context().flow.set("foo","bar","memory1",function(err){
if(err){
done(err);
}else{
changeNode1.context().flow.set("foo","error!");
changeNode1.receive({payload:"Hello World!"});
}
});
});
});
});
it('changes the value using persistable global context with jsonata', function(done) {
var flow = [{"id":"changeNode1","type":"change",rules:[{"t":"set","p":"payload","to":"$globalContext(\"foo\",\"memory1\")","tot":"jsonata"}],"name":"changeNode","wires":[["helperNode1"]],"z":"flow"},
{id:"helperNode1", type:"helper", wires:[],"z":"flow"},{"id":"flow","type":"tab"}];
helper.load(changeNode, flow, function() {
initContext(function () {
var changeNode1 = helper.getNode("changeNode1");
var helperNode1 = helper.getNode("helperNode1");
helperNode1.on("input", function(msg) {
try {
msg.payload.should.eql("bar");
done();
} catch(err) {
done(err);
}
});
changeNode1.context().global.set("foo","bar","memory1",function(err){
if(err){
done(err);
}else{
changeNode1.context().global.set("foo","error!");
changeNode1.receive({payload:"Hello World!"});
}
});
});
});
});
});
describe('#change', function() {
it('changes the value of the message property', function(done) {

View File

@ -19,6 +19,7 @@ var splitNode = require("../../../../nodes/core/logic/17-split.js");
var joinNode = require("../../../../nodes/core/logic/17-split.js");
var helper = require("node-red-node-test-helper");
var RED = require("../../../../red/red.js");
var Context = require("../../../../red/runtime/nodes/context");
var TimeoutForErrorCase = 20;
@ -390,17 +391,32 @@ describe('SPLIT node', function() {
describe('JOIN node', function() {
before(function(done) {
beforeEach(function(done) {
helper.startServer(done);
});
after(function(done) {
helper.stopServer(done);
});
function initContext(done) {
Context.init({
contextStorage: {
memory: {
module: "memory"
}
}
});
Context.load().then(function () {
done();
});
}
afterEach(function() {
helper.unload();
RED.settings.nodeMessageBufferMaxLength = 0;
afterEach(function(done) {
helper.unload().then(function(){
return Context.clean({allNodes:{}});
}).then(function(){
return Context.close();
}).then(function(){
RED.settings.nodeMessageBufferMaxLength = 0;
helper.stopServer(done);
});
});
it('should be loaded', function(done) {
@ -1225,6 +1241,142 @@ describe('JOIN node', function() {
});
});
it('should reduce messages with flow context', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+(payload*$flowContext(\"two\"))",
reduceInit:"$flowContext(\"one\")",
reduceInitType:"jsonata",
reduceFixup:"$A*$flowContext(\"three\")",
wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(((((1+1*2)+2*2)+3*2)+4*2)*3);
done();
}
catch(e) { done(e); }
});
n1.context().flow.set("one",1);
n1.context().flow.set("two",2);
n1.context().flow.set("three",3);
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
it('should reduce messages with global context', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+(payload/$globalContext(\"two\"))",
reduceInit:"$globalContext(\"one\")",
reduceInitType:"jsonata",
reduceFixup:"$A*$globalContext(\"three\")",
wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
helper.load(joinNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(((((1+1/2)+2/2)+3/2)+4/2)*3);
done();
}
catch(e) { done(e); }
});
n1.context().global.set("one",1);
n1.context().global.set("two",2);
n1.context().global.set("three",3);
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
});
});
it('should reduce messages with persistable flow context', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+(payload*$flowContext(\"two\",\"memory\"))",
reduceInit:"$flowContext(\"one\",\"memory\")",
reduceInitType:"jsonata",
reduceFixup:"$A*$flowContext(\"three\",\"memory\")",
wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
helper.load(joinNode, flow, function() {
initContext(function () {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(((((1+1*2)+2*2)+3*2)+4*2)*3);
done();
}
catch(e) { done(e); }
});
n1.context().flow.set(["one","two","three"],[1,2,3],"memory",function(err){
if(err){
done(err);
} else{
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
}
});
});
});
});
it('should reduce messages with persistable global context', function(done) {
var flow = [{id:"n1", type:"join", mode:"reduce",
reduceRight:false,
reduceExp:"$A+(payload/$globalContext(\"two\",\"memory\"))",
reduceInit:"$globalContext(\"one\",\"memory\")",
reduceInitType:"jsonata",
reduceFixup:"$A*$globalContext(\"three\",\"memory\")",
wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
helper.load(joinNode, flow, function() {
initContext(function () {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n2.on("input", function(msg) {
try {
msg.should.have.property("payload");
msg.payload.should.equal(((((1+1/2)+2/2)+3/2)+4/2)*3);
done();
}
catch(e) { done(e); }
});
n1.context().global.set(["one","two","three"],[1,2,3],"memory",function(err){
if(err){
done(err);
} else{
n1.receive({payload:3, parts:{index:2, count:4, id:222}});
n1.receive({payload:2, parts:{index:1, count:4, id:222}});
n1.receive({payload:4, parts:{index:3, count:4, id:222}});
n1.receive({payload:1, parts:{index:0, count:4, id:222}});
}
});
});
});
});
it('should handle invalid JSON expression"', function (done) {
var flow = [{
id: "n1", type: "join", mode: "reduce",

View File

@ -18,20 +18,32 @@ var should = require("should");
var sortNode = require("../../../../nodes/core/logic/18-sort.js");
var helper = require("node-red-node-test-helper");
var RED = require("../../../../red/red.js");
var Context = require("../../../../red/runtime/nodes/context");
describe('SORT node', function() {
before(function(done) {
beforeEach(function(done) {
helper.startServer(done);
});
after(function(done) {
helper.stopServer(done);
});
function initContext(done) {
Context.init({
contextStorage: {
memory: {
module: "memory"
}
}
});
Context.load().then(function () {
done();
});
}
afterEach(function() {
helper.unload();
RED.settings.nodeMessageBufferMaxLength = 0;
afterEach(function(done) {
helper.unload().then(function(){
RED.settings.nodeMessageBufferMaxLength = 0;
helper.stopServer(done);
});
});
it('should be loaded', function(done) {
@ -227,6 +239,134 @@ describe('SORT node', function() {
});
})();
it('should sort payload by context (exp, not number, ascending)', function(done) {
var flow = [{id:"n1", type:"sort", target:"data", targetType:"msg", msgKey:"$flowContext($)", msgKeyType:"jsonata", order:"ascending", as_num:false, wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
var data_in = [ "first", "second", "third", "fourth" ];
var data_out = [ "second", "third", "first", "fourth" ];
helper.load(sortNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n1.context()["flow"].set("first","3");
n1.context()["flow"].set("second","1");
n1.context()["flow"].set("third","2");
n1.context()["flow"].set("fourth","4");
n2.on("input", function(msg) {
msg.should.have.property("data");
var data = msg["data"];
data.length.should.equal(data_out.length);
for(var i = 0; i < data_out.length; i++) {
data[i].should.equal(data_out[i]);
}
done();
});
var msg = {};
msg["data"] = data_in;
n1.receive(msg);
});
});
it('should sort message group by context (exp, not number, ascending)', function(done) {
var flow = [{id:"n1", type:"sort", target:"data", targetType:"msg", msgKey:"$globalContext(payload)", msgKeyType:"jsonata", order:"ascending", as_num:false, wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
var data_in = [ "first", "second", "third", "fourth" ];
var data_out = [ "second", "fourth", "third", "first" ];
helper.load(sortNode, flow, function() {
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n1.context()["global"].set("first","4");
n1.context()["global"].set("second","1");
n1.context()["global"].set("third","3");
n1.context()["global"].set("fourth","2");
n2.on("input", function(msg) {
msg.should.have.property("payload");
msg.should.have.property("parts");
msg.parts.should.have.property("count", data_out.length);
var data = msg["payload"];
var index = data_out.indexOf(data);
msg.parts.should.have.property("index", index);
count++;
if (count === data_out.length) {
done();
}
});
var len = data_in.length;
for(var i = 0; i < len; i++) {
var parts = { id: "X", index: i, count: len };
var msg = {parts: parts};
msg["payload"] = data_in[i];
n1.receive(msg);
}
});
});
it('should sort payload by persistable context (exp, not number, descending)', function(done) {
var flow = [{id:"n1", type:"sort", target:"data", targetType:"msg", msgKey:"$globalContext($,\"memory\")", msgKeyType:"jsonata", order:"descending", as_num:false, wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
var data_in = [ "first", "second", "third", "fourth" ];
var data_out = [ "fourth", "first", "third", "second" ];
helper.load(sortNode, flow, function() {
initContext(function(){
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
n1.context()["global"].set(["first","second","third","fourth"],["3","1","2","4"],"memory",function(){
n2.on("input", function(msg) {
msg.should.have.property("data");
var data = msg["data"];
data.length.should.equal(data_out.length);
for(var i = 0; i < data_out.length; i++) {
data[i].should.equal(data_out[i]);
}
done();
});
var msg = {};
msg["data"] = data_in;
n1.receive(msg);
});
});
});
});
it('should sort message group by persistable context (exp, not number, descending)', function(done) {
var flow = [{id:"n1", type:"sort", target:"data", targetType:"msg", msgKey:"$flowContext(payload,\"memory\")", msgKeyType:"jsonata", order:"descending", as_num:false, wires:[["n2"]],z:"flow"},
{id:"n2", type:"helper",z:"flow"},
{id:"flow", type:"tab"}];
var data_in = [ "first", "second", "third", "fourth" ];
var data_out = [ "first", "third", "fourth", "second" ];
helper.load(sortNode, flow, function() {
initContext(function(){
var n1 = helper.getNode("n1");
var n2 = helper.getNode("n2");
var count = 0;
n1.context()["flow"].set(["first","second","third","fourth"],["4","1","3","2"],"memory",function(){
n2.on("input", function(msg) {
msg.should.have.property("payload");
msg.should.have.property("parts");
msg.parts.should.have.property("count", data_out.length);
var data = msg["payload"];
var index = data_out.indexOf(data);
msg.parts.should.have.property("index", index);
count++;
if (count === data_out.length) {
done();
}
});
var len = data_in.length;
for(var i = 0; i < len; i++) {
var parts = { id: "X", index: i, count: len };
var msg = {parts: parts};
msg["payload"] = data_in[i];
n1.receive(msg);
}
});
});
});
});
it('should handle JSONata script error', function(done) {
var flow = [{id:"n1", type:"sort", order:"ascending", as_num:false, target:"payload", targetType:"seq", seqKey:"$unknown()", seqKeyType:"jsonata", wires:[["n2"]]},
{id:"n2", type:"helper"}];

View File

@ -459,7 +459,7 @@ describe("red/util", function() {
should.not.exist(result);
});
it('handles async flow context access', function(done) {
var expr = util.prepareJSONataExpression('$flowContext("foo")',{context:function() { return {flow:{get: function(key,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
var expr = util.prepareJSONataExpression('$flowContext("foo")',{context:function() { return {flow:{get: function(key,store,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
@ -471,7 +471,7 @@ describe("red/util", function() {
});
})
it('handles async global context access', function(done) {
var expr = util.prepareJSONataExpression('$globalContext("foo")',{context:function() { return {global:{get: function(key,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
var expr = util.prepareJSONataExpression('$globalContext("foo")',{context:function() { return {global:{get: function(key,store,callback) { setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
@ -482,6 +482,34 @@ describe("red/util", function() {
}
});
})
it('handles persistable store in flow context access', function(done) {
var storeName;
var expr = util.prepareJSONataExpression('$flowContext("foo", "flowStoreName")',{context:function() { return {flow:{get: function(key,store,callback) { storeName = store;setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
value.should.eql("bar");
storeName.should.equal("flowStoreName");
done();
} catch(err2) {
done(err2);
}
});
})
it('handles persistable store in global context access', function(done) {
var storeName;
var expr = util.prepareJSONataExpression('$globalContext("foo", "globalStoreName")',{context:function() { return {global:{get: function(key,store,callback) { storeName = store;setTimeout(()=>{callback(null,{'foo':'bar'}[key])},10)}}}}});
util.evaluateJSONataExpression(expr,{payload:"hello"},function(err,value) {
try {
should.not.exist(err);
value.should.eql("bar");
storeName.should.equal("globalStoreName");
done();
} catch(err2) {
done(err2);
}
});
})
});