Merge branch '0.18' into projects

This commit is contained in:
Nick O'Leary
2018-01-22 23:17:28 +00:00
34 changed files with 3191 additions and 346 deletions

View File

@@ -18,7 +18,7 @@
<div class="form-row">
<label data-i18n="trigger.send" for="node-input-op1"></label>
<input type="hidden" id="node-input-op1type">
<input style="width: 70%" type="text" id="node-input-op1">
<input style="width: 70%" type="text" id="node-input-op1" placeholder="1">
</div>
<div class="form-row">
<label data-i18n="trigger.then"></label>
@@ -45,7 +45,7 @@
<div class="form-row node-type-wait">
<label data-i18n="trigger.then-send"></label>
<input type="hidden" id="node-input-op2type">
<input style="width: 70%" type="text" id="node-input-op2">
<input style="width: 70%" type="text" id="node-input-op2" placeholder="0">
</div>
<div class="form-row">
<label data-i18n="trigger.label.reset" style="width:auto"></label>

View File

@@ -80,7 +80,7 @@ module.exports = function(RED) {
var topic = msg.topic || "_none";
if (node.bytopic === "all") { topic = "_none"; }
node.topics[topic] = node.topics[topic] || {};
if (msg.hasOwnProperty("reset") || ((node.reset !== '') && (msg.payload == node.reset)) ) {
if (msg.hasOwnProperty("reset") || ((node.reset !== '') && msg.hasOwnProperty("payload") && (msg.payload !== null) && msg.payload.toString && (msg.payload.toString() == node.reset)) ) {
if (node.loop === true) { clearInterval(node.topics[topic].tout); }
else { clearTimeout(node.topics[topic].tout); }
delete node.topics[topic];
@@ -104,7 +104,9 @@ module.exports = function(RED) {
if (node.duration === 0) { node.topics[topic].tout = 0; }
else if (node.loop === true) {
/* istanbul ignore else */
if (node.topics[topic].tout) { clearInterval(node.topics[topic].tout); }
/* istanbul ignore else */
if (node.op1type !== "nul") {
var msg2 = RED.util.cloneMessage(msg);
node.topics[topic].tout = setInterval(function() { node.send(RED.util.cloneMessage(msg2)); }, node.duration);
@@ -128,7 +130,9 @@ module.exports = function(RED) {
node.status({fill:"blue",shape:"dot",text:" "});
}
else if ((node.extend === "true" || node.extend === true) && (node.duration > 0)) {
/* istanbul ignore else */
if (node.op2type === "payl") { node.topics[topic].m2 = RED.util.cloneMessage(msg.payload); }
/* istanbul ignore else */
if (node.topics[topic].tout) { clearTimeout(node.topics[topic].tout); }
node.topics[topic].tout = setTimeout(function() {
var msg2 = null;
@@ -153,6 +157,7 @@ module.exports = function(RED) {
});
this.on("close", function() {
for (var t in node.topics) {
/* istanbul ignore else */
if (node.topics[t]) {
if (node.loop === true) { clearInterval(node.topics[t].tout); }
else { clearTimeout(node.topics[t].tout); }

View File

@@ -541,7 +541,8 @@
"switch": {
"label": {
"property": "Property",
"rule": "rule"
"rule": "rule",
"repair" : "repair sequence (reconstruct parts property of outgoing messages)"
},
"and": "and",
"checkall": "checking all rules",
@@ -555,10 +556,15 @@
"false":"is false",
"null":"is null",
"nnull":"is not null",
"head":"head",
"tail":"tail",
"index":"is between",
"exp":"JSONata exp",
"else":"otherwise"
},
"errors": {
"invalid-expr": "Invalid JSONata expression: __error__"
"invalid-expr": "Invalid JSONata expression: __error__",
"too-many" : "too many pending messages in switch node"
}
},
"change": {
@@ -840,6 +846,8 @@
"mode":{
"mode":"Mode",
"auto":"automatic",
"merge":"merge sequence",
"reduce":"reduce sequence",
"custom":"manual"
},
"combine":"Combine each",
@@ -861,19 +869,63 @@
"afterTimeout":"After a timeout following the first message",
"seconds":"seconds",
"complete":"After a message with the <code>msg.complete</code> property set",
"tip":"This mode assumes this node is either paired with a <i>split</i> node or the received messages will have a properly configured <code>msg.parts</code> property."
"tip":"This mode assumes this node is either paired with a <i>split</i> node or the received messages will have a properly configured <code>msg.parts</code> property.",
"too-many" : "too many pending messages in join node",
"merge": {
"topics-label":"Merged Topics",
"topics":"topics",
"topic" : "topic",
"on-change":"Send merged message on arrival of a new topic"
},
"reduce": {
"exp": "Reduce exp",
"exp-value": "exp",
"init": "Initial value",
"right": "Evaluate in reverse order (right to left)",
"fixup": "Fixup exp"
},
"errors": {
"invalid-expr": "Invalid JSONata expression: __error__"
}
},
"sort" : {
"key-type" : "Key type",
"payload" : "payload or element",
"exp" : "expression",
"key-exp" : "Key exp.",
"order" : "Order",
"ascending" : "ascending",
"descending" : "descending",
"as-number" : "as number",
"target" : "Sort",
"seq" : "message sequence",
"key" : "Key",
"elem" : "element value",
"order" : "Order",
"ascending" : "ascending",
"descending" : "descending",
"as-number" : "as number",
"invalid-exp" : "invalid JSONata expression in sort node",
"too-many" : "too many pending messages in sort node",
"clear" : "clear pending message in sort node"
},
"batch" : {
"mode": {
"label" : "Mode",
"num-msgs" : "number of messages",
"interval" : "interval in seconds",
"concat" : "concatenate sequences"
},
"count": {
"label" : "Number of msgs",
"overwrap" : "Overwrap",
"count" : "count",
"invalid" : "Invalid count and overwrap"
},
"interval": {
"label" : "Interval (sec)",
"seconds" : "seconds",
"sec" : "sec",
"empty" : "send empty message when no message arrives"
},
"concat": {
"topics-label": "Topics",
"topic" : "topic"
},
"too-many" : "too many pending messages in batch node",
"unexpected" : "unexpected mode",
"no-parts" : "no parts property in message"
}
}

View File

@@ -33,6 +33,10 @@
<option value="false" data-i18n="switch.stopfirst"></option>
</select>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-repair" style="display: inline-block; width: auto; vertical-align: top;">
<label style="width: auto;" for="node-input-repair"><span data-i18n="switch.label.repair"></span></label></input>
</div>
</script>
<script type="text/x-red" data-help-name="switch">
@@ -44,25 +48,138 @@
that matches.</p>
<p>The rules can be evaluated against an individual message property, a flow or global
context property or the result of a JSONata expression.</p>
<h3>Rules</h3>
<p>Routing rules are categorized into three:</p>
<dl>
<dt>value rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>==</b></td>
<td>property value is equals to specified value</td>
</tr>
<tr>
<td><b>!=</b></td>
<td>property value is not equals to specified value</td>
</tr>
<tr>
<td><b>&lt;</b></td>
<td>property value is less than specified value</td>
</tr>
<tr>
<td><b>&lt;=</b></td>
<td>property value is less than or equals to specified value</td>
</tr>
<tr>
<td><b>&gt;</b></td>
<td>property value is greater than specified value</td>
</tr>
<tr>
<td><b>&gt;=</b></td>
<td>property value is greater than or equals to specified value</td>
</tr>
<tr>
<td><b>is between</b></td>
<td>property value is between specified values</td>
</tr>
<tr>
<td><b>contains</b></td>
<td>property string value contains specified string value</td>
</tr>
<tr>
<td><b>matches regex</b></td>
<td>property string value matches specified regex</td>
</tr>
<tr>
<td><b>is true</b></td>
<td>property value is true</td>
</tr>
<tr>
<td><b>is false</b></td>
<td>property value is false</td>
</tr>
<tr>
<td><b>is null</b></td>
<td>property value is null</td>
</tr>
<tr>
<td><b>is not null</b></td>
<td>property value is not null</td>
</tr>
</table>
</dd>
<dt>sequence rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>head</b></td>
<td>message is included in the first specified number of messages in a sequence</td>
</tr>
<tr>
<td><b>tail</b></td>
<td>message is included in the last specified number of messages in a sequence</td>
</tr>
<tr>
<td><b>pos. between</b></td>
<td>message is included between specified positions in a sequence</td>
</tr>
</table>
</dd>
<dt>other rules</dt>
<dd>
<table>
<tr>
<th>operator</th>
<th>description</th>
</tr>
<tr>
<td><b>JSONata exp</b></td>
<td>specified JSONata expression evaluate to true. In JSONata expression, $I represents <code>msg.parts.index</code> and $N represents <code>msg.parts.count</code> respectively if exists.</td>
</tr>
<tr>
<td><b>otherwise</b></td>
<td>no matching rule found</td>
</tr>
</table>
</dd>
</dl>
<h3>Repair Sequence</h3>
<p>If <b>repair sequence (reconstruct parts property of outgoing messages)</b> checkbox is selected, <code>msg.parts</code> property are reconstructed for each port to make forks of valid sequences. Otherwise, <code>msg.parts</code> property of incoming messages are passed through.</p>
<h3>Note:</h3>
<p>This node internally keeps messages for its operation if <b>repair sequence</b> checkbox is ON. In order to prevent unexpected memory usage, maximum number of messages kept can be specified by <code>switchMaxKeptMsgsCount</code> property in <b>settings.js</b>.</p>
</script>
<script type="text/javascript">
(function() {
var operators = [
{v:"eq",t:"=="},
{v:"neq",t:"!="},
{v:"lt",t:"<"},
{v:"lte",t:"<="},
{v:"gt",t:">"},
{v:"gte",t:">="},
{v:"btwn",t:"switch.rules.btwn"},
{v:"cont",t:"switch.rules.cont"},
{v:"regex",t:"switch.rules.regex"},
{v:"true",t:"switch.rules.true"},
{v:"false",t:"switch.rules.false"},
{v:"null",t:"switch.rules.null"},
{v:"nnull",t:"switch.rules.nnull"},
{v:"else",t:"switch.rules.else"}
{v:"eq",t:"==",kind:'V'},
{v:"neq",t:"!=",kind:'V'},
{v:"lt",t:"<",kind:'V'},
{v:"lte",t:"<=",kind:'V'},
{v:"gt",t:">",kind:'V'},
{v:"gte",t:">=",kind:'V'},
{v:"btwn",t:"switch.rules.btwn",kind:'V'},
{v:"cont",t:"switch.rules.cont",kind:'V'},
{v:"regex",t:"switch.rules.regex",kind:'V'},
{v:"true",t:"switch.rules.true",kind:'V'},
{v:"false",t:"switch.rules.false",kind:'V'},
{v:"null",t:"switch.rules.null",kind:'V'},
{v:"nnull",t:"switch.rules.nnull",kind:'V'},
{v:"head",t:"switch.rules.head",kind:'S'},
{v:"index",t:"switch.rules.index",kind:'S'},
{v:"tail",t:"switch.rules.tail",kind:'S'},
{v:"jsonata_exp",t:"switch.rules.exp",kind:'O'},
{v:"else",t:"switch.rules.else",kind:'O'}
];
function clipValueLength(v) {
@@ -88,6 +205,7 @@
propertyType: { value:"msg" },
rules: {value:[{t:"eq", v:"", vt:"str"}]},
checkall: {value:"true", required:true},
repair: {value:false},
outputs: {value:1}
},
inputs: 1,
@@ -102,7 +220,8 @@
break;
}
}
if (rule.t === 'btwn') {
if ((rule.t === 'btwn') || (rule.t === 'index')) {
console.log(`; ${label}/${JSON.stringify(rule)}`);
label += " "+getValueLabel(rule.vt,rule.v)+" & "+getValueLabel(rule.v2t,rule.v2);
} else if (rule.t !== 'true' && rule.t !== 'false' && rule.t !== 'null' && rule.t !== 'nnull' && rule.t !== 'else' ) {
label += " "+getValueLabel(rule.vt,rule.v);
@@ -132,6 +251,8 @@
var selectField = rule.find("select");
var type = selectField.val()||"";
var valueField = rule.find(".node-input-rule-value");
var numField = rule.find(".node-input-rule-num-value");
var expField = rule.find(".node-input-rule-exp-value");
var btwnField1 = rule.find(".node-input-rule-btwn-value");
var btwnField2 = rule.find(".node-input-rule-btwn-value2");
var selectWidth;
@@ -143,9 +264,13 @@
selectWidth = 120;
}
selectField.width(selectWidth);
if (type === "btwn") {
if ((type === "btwn") || (type === "index")) {
btwnField1.typedInput("width",(newWidth-selectWidth-70));
btwnField2.typedInput("width",(newWidth-selectWidth-70));
} else if ((type === "head") || (type === "tail")) {
numField.typedInput("width",(newWidth-selectWidth-70));
} else if (type === "jsonata_exp") {
expField.typedInput("width",(newWidth-selectWidth-70));
} else {
if (type === "true" || type === "false" || type === "null" || type === "nnull" || type === "else") {
// valueField.hide();
@@ -171,10 +296,26 @@
var row2 = $('<div/>',{style:"padding-top: 5px; padding-left: 175px;"}).appendTo(container);
var row3 = $('<div/>',{style:"padding-top: 5px; padding-left: 102px;"}).appendTo(container);
var selectField = $('<select/>',{style:"width:120px; margin-left: 5px; text-align: center;"}).appendTo(row);
var group0 = $('<optgroup/>', { label: "value rules" }).appendTo(selectField);
for (var d in operators) {
selectField.append($("<option></option>").val(operators[d].v).text(/^switch/.test(operators[d].t)?node._(operators[d].t):operators[d].t));
if(operators[d].kind === 'V') {
group0.append($("<option></option>").val(operators[d].v).text(/^switch/.test(operators[d].t)?node._(operators[d].t):operators[d].t));
}
}
var group1 = $('<optgroup/>', { label: "sequence rules" }).appendTo(selectField);
for (var d in operators) {
if(operators[d].kind === 'S') {
group1.append($("<option></option>").val(operators[d].v).text(/^switch/.test(operators[d].t)?node._(operators[d].t):operators[d].t));
}
}
for (var d in operators) {
if(operators[d].kind === 'O') {
selectField.append($("<option></option>").val(operators[d].v).text(/^switch/.test(operators[d].t)?node._(operators[d].t):operators[d].t));
}
}
var valueField = $('<input/>',{class:"node-input-rule-value",type:"text",style:"margin-left: 5px;"}).appendTo(row).typedInput({default:'str',types:['msg','flow','global','str','num','jsonata',previousValueType]});
var numValueField = $('<input/>',{class:"node-input-rule-num-value",type:"text",style:"margin-left: 5px;"}).appendTo(row).typedInput({default:'num',types:['flow','global','num','jsonata']});
var expValueField = $('<input/>',{class:"node-input-rule-exp-value",type:"text",style:"margin-left: 5px;"}).appendTo(row).typedInput({default:'jsonata',types:['jsonata']});
var btwnValueField = $('<input/>',{class:"node-input-rule-btwn-value",type:"text",style:"margin-left: 5px;"}).appendTo(row).typedInput({default:'num',types:['msg','flow','global','str','num','jsonata',previousValueType]});
var btwnAndLabel = $('<span/>',{class:"node-input-rule-btwn-label"}).text(" "+andLabel+" ").appendTo(row3);
var btwnValue2Field = $('<input/>',{class:"node-input-rule-btwn-value2",type:"text",style:"margin-left:2px;"}).appendTo(row3).typedInput({default:'num',types:['msg','flow','global','str','num','jsonata',previousValueType]});
@@ -185,11 +326,27 @@
selectField.change(function() {
resizeRule(container);
var type = selectField.val();
if (type === "btwn") {
if ((type === "btwn") || (type === "index")) {
valueField.typedInput('hide');
expValueField.typedInput('hide');
numValueField.typedInput('hide');
btwnValueField.typedInput('show');
} else if ((type === "head") || (type === "tail")) {
btwnValueField.typedInput('hide');
btwnValue2Field.typedInput('hide');
expValueField.typedInput('hide');
numValueField.typedInput('show');
valueField.typedInput('hide');
} else if (type === "jsonata_exp") {
btwnValueField.typedInput('hide');
btwnValue2Field.typedInput('hide');
expValueField.typedInput('show');
numValueField.typedInput('hide');
valueField.typedInput('hide');
} else {
btwnValueField.typedInput('hide');
expValueField.typedInput('hide');
numValueField.typedInput('hide');
if (type === "true" || type === "false" || type === "null" || type === "nnull" || type === "else") {
valueField.typedInput('hide');
} else {
@@ -199,7 +356,7 @@
if (type === "regex") {
row2.show();
row3.hide();
} else if (type === "btwn") {
} else if ((type === "btwn") || (type === "index")) {
row2.hide();
row3.show();
btwnValue2Field.typedInput('show');
@@ -209,11 +366,17 @@
}
});
selectField.val(rule.t);
if (rule.t == "btwn") {
if ((rule.t == "btwn") || (rule.t == "index")) {
btwnValueField.typedInput('value',rule.v);
btwnValueField.typedInput('type',rule.vt||'num');
btwnValue2Field.typedInput('value',rule.v2);
btwnValue2Field.typedInput('type',rule.v2t||'num');
} else if ((rule.t === "head") || (rule.t === "tail")) {
numValueField.typedInput('value',rule.v);
numValueField.typedInput('type',rule.vt||'num');
} else if (rule.t === "jsonata_exp") {
expValueField.typedInput('value',rule.v);
expValueField.typedInput('type',rule.vt||'jsonata');
} else if (typeof rule.v != "undefined") {
valueField.typedInput('value',rule.v);
valueField.typedInput('type',rule.vt||'str');
@@ -275,11 +438,17 @@
var type = rule.find("select").val();
var r = {t:type};
if (!(type === "true" || type === "false" || type === "null" || type === "nnull" || type === "else")) {
if (type === "btwn") {
if ((type === "btwn") || (type === "index")) {
r.v = rule.find(".node-input-rule-btwn-value").typedInput('value');
r.vt = rule.find(".node-input-rule-btwn-value").typedInput('type');
r.v2 = rule.find(".node-input-rule-btwn-value2").typedInput('value');
r.v2t = rule.find(".node-input-rule-btwn-value2").typedInput('type');
} else if ((type === "head") || (type === "tail")) {
r.v = rule.find(".node-input-rule-num-value").typedInput('value');
r.vt = rule.find(".node-input-rule-num-value").typedInput('type');
} else if (type === "jsonata_exp") {
r.v = rule.find(".node-input-rule-exp-value").typedInput('value');
r.vt = rule.find(".node-input-rule-exp-value").typedInput('type');
} else {
r.v = rule.find(".node-input-rule-value").typedInput('value');
r.vt = rule.find(".node-input-rule-value").typedInput('type');

View File

@@ -31,9 +31,39 @@ module.exports = function(RED) {
'false': function(a) { return a === false; },
'null': function(a) { return (typeof a == "undefined" || a === null); },
'nnull': function(a) { return (typeof a != "undefined" && a !== null); },
'head': function(a, b, c, d, parts) {
var count = Number(b);
return (parts.index < count);
},
'tail': function(a, b, c, d, parts) {
var count = Number(b);
return (parts.count -count <= parts.index);
},
'index': function(a, b, c, d, parts) {
var min = Number(b);
var max = Number(c);
var index = parts.index;
return ((min <= index) && (index <= max));
},
'jsonata_exp': function(a, b) { return (b === true); },
'else': function(a) { return a === true; }
};
var _max_kept_msgs_count = undefined;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "switchMaxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
}
}
return _max_kept_msgs_count;
}
function SwitchNode(n) {
RED.nodes.createNode(this, n);
this.rules = n.rules || [];
@@ -53,8 +83,11 @@ module.exports = function(RED) {
this.previousValue = null;
var node = this;
var valid = true;
var needs_count = false;
var repair = n.repair;
for (var i=0; i<this.rules.length; i+=1) {
var rule = this.rules[i];
needs_count = needs_count || ((rule.t === "tail") || (rule.t === "jsonata_exp"));
if (!rule.vt) {
if (!isNaN(Number(rule.v))) {
rule.vt = 'num';
@@ -99,7 +132,136 @@ module.exports = function(RED) {
return;
}
this.on('input', function (msg) {
var pending_count = 0;
var pending_id = 0;
var pending_in = {};
var pending_out = {};
var received = {};
function add2group_in(id, msg, parts) {
if (!(id in pending_in)) {
pending_in[id] = {
count: undefined,
msgs: [],
seq_no: pending_id++
};
}
var group = pending_in[id];
group.msgs.push(msg);
pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
clear_pending();
node.error(RED._("switch.errors.too-many"), msg);
}
if (parts.hasOwnProperty("count")) {
group.count = parts.count;
}
return group;
}
function del_group_in(id, group) {
pending_count -= group.msgs.length;
delete pending_in[id];
}
function add2pending_in(msg) {
var parts = msg.parts;
if (parts.hasOwnProperty("id") &&
parts.hasOwnProperty("index")) {
var group = add2group_in(parts.id, msg, parts);
var msgs = group.msgs;
var count = group.count;
if (count === msgs.length) {
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i];
msg.parts.count = count;
process_msg(msg, false);
}
del_group_in(parts.id, group);
}
return true;
}
return false;
}
function send_group(onwards, port_count) {
var counts = new Array(port_count).fill(0);
var ids = new Array(port_count);
for(var i = 0; i < onwards.length; i++) {
var onward = onwards[i];
for(var j = 0; j < port_count; j++) {
counts[j] += (onward[j] !== null) ? 1 : 0
}
ids[i] = RED.util.generateId();
}
var indexes = new Array(port_count).fill(0);
var ports = new Array(port_count);
for (var i = 0; i < onwards.length; i++) {
var onward = onwards[i];
for (var j = 0; j < port_count; j++) {
var msg = onward[j];
if (msg) {
var new_msg = RED.util.cloneMessage(msg);
var parts = new_msg.parts;
parts.id = ids[j];
parts.index = indexes[j];
parts.count = counts[j];
ports[j] = new_msg;
indexes[j]++;
}
else {
ports[j] = null;
}
}
node.send(ports);
}
}
function send2ports(onward, msg) {
var parts = msg.parts;
var gid = parts.id;
received[gid] = ((gid in received) ? received[gid] : 0) +1;
var send_ok = (received[gid] === parts.count);
if (!(gid in pending_out)) {
pending_out[gid] = {
onwards: []
};
}
var group = pending_out[gid];
var onwards = group.onwards;
onwards.push(onward);
pending_count++;
if (send_ok) {
send_group(onwards, onward.length, msg);
pending_count -= onward.length;
delete pending_out[gid];
delete received[gid];
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
clear_pending();
node.error(RED._("switch.errors.too-many"), msg);
}
}
function msg_has_parts(msg) {
if (msg.hasOwnProperty("parts")) {
var parts = msg.parts;
return (parts.hasOwnProperty("id") &&
parts.hasOwnProperty("index") &&
parts.hasOwnProperty("count"));
}
return false;
}
function process_msg(msg, check_parts) {
var has_parts = msg_has_parts(msg);
if (needs_count && check_parts && has_parts &&
add2pending_in(msg)) {
return;
}
var onward = [];
try {
var prop;
@@ -117,7 +279,14 @@ module.exports = function(RED) {
v1 = node.previousValue;
} else if (rule.vt === 'jsonata') {
try {
v1 = RED.util.evaluateJSONataExpression(rule.v,msg);
var exp = rule.v;
if (rule.t === 'jsonata_exp') {
if (has_parts) {
exp.assign("I", msg.parts.index);
exp.assign("N", msg.parts.count);
}
}
v1 = RED.util.evaluateJSONataExpression(exp,msg);
} catch(err) {
node.error(RED._("switch.errors.invalid-expr",{error:err.message}));
return;
@@ -147,7 +316,7 @@ module.exports = function(RED) {
}
}
if (rule.t == "else") { test = elseflag; elseflag = true; }
if (operators[rule.t](test,v1,v2,rule.case)) {
if (operators[rule.t](test,v1,v2,rule.case,msg.parts)) {
onward.push(msg);
elseflag = false;
if (node.checkall == "false") { break; }
@@ -156,11 +325,33 @@ module.exports = function(RED) {
}
}
node.previousValue = prop;
this.send(onward);
if (repair || !has_parts) {
node.send(onward);
}
else {
send2ports(onward, msg);
}
} catch(err) {
node.warn(err);
}
}
function clear_pending() {
pending_count = 0;
pending_id = 0;
pending_in = {};
pending_out = {};
received = {};
}
this.on('input', function(msg) {
process_msg(msg, true);
});
this.on('close', function() {
clear_pending();
});
}
RED.nodes.registerType("switch", SwitchNode);
}

View File

@@ -108,7 +108,7 @@
arraySplt: {value:1},
arraySpltType: {value:"len"},
stream: {value:false},
addname: {value:""},
addname: {value:""}
},
inputs:1,
outputs:1,
@@ -170,6 +170,8 @@
<select id="node-input-mode" style="width:200px;">
<option value="auto" data-i18n="join.mode.auto"></option>
<option value="custom" data-i18n="join.mode.custom"></option>
<option value="merge" data-i18n="join.mode.merge"></option>
<option value="reduce" data-i18n="join.mode.reduce"></option>
</select>
</div>
<div class="node-row-custom">
@@ -217,6 +219,38 @@
</ul>
</div>
</div>
<div class="node-row-merge">
<div class="form-row">
<label data-i18n="join.merge.topics-label"></label>
<div class="form-row node-input-topics-container-row">
<ol id="node-input-topics-container"></ol>
</div>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-mergeOnChange" style="margin-left:10px; vertical-align:top; width:auto;">
<label for="node-input-mergeOnChange" style="width:auto;" data-i18n="join.merge.on-change"></label>
</div>
</div>
<div class="node-row-reduce">
<div class="form-row">
<label for="node-input-reduceExp" data-i18n="join.reduce.exp" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceExp" data-i18n="[placeholder]join.reduce.exp-value" style="width:65%">
</div>
<div class="form-row">
<label for="node-input-reduceInit" data-i18n="join.reduce.init" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceInit" data-i18n="[placeholder]join.reduce.init" style="width:65%">
<input type="hidden" id="node-input-reduceInitType">
</div>
<div class="form-row">
<label for="node-input-reduceFixup" data-i18n="join.reduce.fixup" style="margin-left:10px;"></label>
<input type="text" id="node-input-reduceFixup" data-i18n="[placeholder]join.reduce.exp-value" style="width:65%">
</div>
<div class="form-row">
<label>&nbsp;</label>
<input type="checkbox" id="node-input-reduceRight" style="display:inline-block; width:auto; vertical-align:top; margin-left:10px;">
<label for="node-input-reduceRight" style="width:70%;" data-i18n="join.reduce.right" style="margin-left:10px;"/>
</div>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
@@ -225,9 +259,17 @@
</script>
<script type="text/x-red" data-help-name="join">
<p>Joins sequences of messages into a single message.</p>
<p>When paired with the <b>split</b> node, it will automatically join the messages
to reverse the split that was performed.</p>
<p>Joins sequences of messages into a single message. This node provides four mode for message combination:</p>
<dl>
<dt>automatic</dt>
<dd>When paired with the <b>split</b> node, it will automatically join the messages to reverse the split that was performed.</dd>
<dt>manual</dt>
<dd>It will join sequences of messages in a variety of ways.</dd>
<dt>merge sequence</dt>
<dd>It will merge incoming messages into single message using <code>topic</code> property.</dd>
<dt>reduce sequence</dt>
<dd>When paired with the <b>split</b> node, it will reduce the message sequence into single message.</dd>
</dl>
<h3>Inputs</h3>
<dl class="message-properties">
<dt class="optional">parts<span class="property-type">object</span></dt>
@@ -248,6 +290,11 @@
<dd>If set, the node will send its output message in its current state.</dd>
</dl>
<h3>Details</h3>
<h4>Automatic mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences of messages using <code>parts</code> property of incoming messages.</p>
<h4>Manual mode</h4>
<p>When configured to join in manual mode, the node is able to join sequences
of messages in a variety of ways.</p>
<ul>
@@ -263,6 +310,50 @@
received.</p>
<p>A <i>timeout</i> can be set to trigger sending the new message using whatever has been received so far.</p>
<p>If a message is received with the <b>msg.complete</b> property set, the output message is sent.</p>
<h4>Merge Sequence mode</h4>
<p>When configured to join in merge mode, the join node can create a message based on <code>topic</code> value.</p>
<p>Input messages are merged in order specified by <b>Topics</b> value.
<p>For example, if value of <b>Topics</b> is <b>x,x,y</b>, two input messages with topic <b>x</b> and one input message with topic <b>y</b> are merged into a new message in order of arrival.</p>
<p>If "Send merged message on arrival of a new topic" check box is selected, the last messages with each topic is kept internally and output message is sent when a message with new topics arrives.</p>
<p>The merged message contains <code>payload</code> property and properties for each topic. The <code>payload</code> property represents ordered array of payload value of input messages for each topic. The property for each topic represents a payload value for single occurrence of topic or array of payload values for multiple occurrences of the topic.</p>
<h4>Reduce Sequence mode</h4>
<p>When configured to join in reduce sequence mode, following values can be specified:</p>
<dl class="message-properties">
<dt>Reduce exp</dt>
<dd>JSONata expression for reducing message group. This expression represents the accumulated result. In the expression, following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$I</code> index of the message in a group, </li>
<li><code>$N</code> number of messages of a group.</li>
</ul>
</dd>
<dt>Initial value</dt>
<dd>
initial value of reduction.
</dd>
<dt>Fixup exp</dt>
<dd>
JSONata expression applied after reduction of a message group completed. In the expression, following special variables can be used:
<ul>
<li><code>$A</code> accumulated value, </li>
<li><code>$N</code> number of messages of a group.</li>
</ul>
</dd>
<p>Order of reduction on a message group can be specified by checkbox (<b>Evaluate in reverse order (right to left)</b>).</p>
</dl>
<p><b>Example:</b> Join node outputs an average for each input message group with the following setting:
<ul>
<li><b>Reduce exp</b>: <code>$A+payload</code></li>
<li><b>Initial value</b>: <code>0</code></li>
<li><b>Fixup exp</b>: <code>$A/$N</code></li>
</ul>
</p>
<h4>Note:</h4>
<p>This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified by <code>joinMaxKeptMsgsCount</code> property in <b>settings.js</b>.</p>
</script>
<script type="text/javascript">
@@ -280,7 +371,14 @@
joinerType: { value:"str"},
accumulate: { value:"false" },
timeout: {value:""},
count: {value:""}
count: {value:""},
topics: {value:[{topic:""}]},
mergeOnChange: {value:false},
reduceRight: {value:false},
reduceExp: {value:undefined},
reduceInit: {value:undefined},
reduceInitType: {value:undefined},
reduceFixup: {value:undefined}
},
inputs:1,
outputs:1,
@@ -292,13 +390,93 @@
return this.name?"node_label_italic":"";
},
oneditprepare: function() {
var node = this;
var topic_str = node._("join.merge.topic");
function resizeTopics(topic) {
var newWidth = topic.width();
topic.find('.red-ui-typedInput')
.typedInput("width",newWidth-15);
}
$("#node-input-topics-container")
.css('min-height','250px').css('min-width','420px')
.editableList({
addItem: function(container,i,opt) {
if (!opt.hasOwnProperty('topic')) {
opt.topic = "";
}
var row = $('<div/>').appendTo(container);
var valueField = $('<input/>',{
class:"node-input-topic-value",
type:"text",
style:"margin-left: 5px;"
}).appendTo(row)
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
valueField.attr('placeholder', topic_str);
resizeTopics(container);
},
resizeItem: resizeTopics,
sortable: true,
removable: true
});
$("#node-input-mode").change(function(e) {
var val = $(this).val();
$(".node-row-custom").toggle(val==='custom');
$(".form-tips-auto").toggle(val==='auto');
$(".node-row-merge").toggle(val==='merge');
$(".node-row-reduce").toggle(val==='reduce');
$(".form-tips-auto").toggle((val==='auto') || (val==='reduce'));
if (val === "auto") {
$("#node-input-accumulate").attr('checked', false);
}
else if (val === "custom") {
$("#node-input-build").change();
}
else if (val === "merge") {
var topics = node.topics;
var container = $("#node-input-topics-container");
container.editableList('empty');
for (var i=0;i<topics.length;i++) {
var topic = topics[i];
container.editableList('addItem', topic);
}
}
else if (val === "reduce") {
var jsonata_or_empty = {
value: "jsonata",
label: "expression",
icon: "red/images/typedInput/expr.png",
validate: function(v) {
try{
if(v !== "") {
jsonata(v);
}
return true;
}
catch(e){
return false;
}
},
expand:function() {
var that = this;
RED.editor.editExpression({
value: this.value().replace(/\t/g,"\n"),
complete: function(v) {
that.value(v.replace(/\n/g,"\t"));
}
})
}
};
$("#node-input-reduceExp").typedInput({types:[jsonata_or_empty]});
$("#node-input-reduceInit").typedInput({
default: 'num',
types:['flow','global','str','num','bool','json','bin','date','jsonata'],
typeField: $("#node-input-reduceInitType")
});
$("#node-input-reduceFixup").typedInput({types:[jsonata_or_empty]});
}
});
$("#node-input-build").change(function(e) {
@@ -309,6 +487,7 @@
$(".node-row-trigger").toggle(val!=='auto');
if (val === 'string' || val==='buffer') {
$("#node-input-property").typedInput('types',['msg']);
$("#node-input-joiner").typedInput("show");
} else {
$("#node-input-property").typedInput('types',['msg', {value:"full",label:"complete message",hasValue:false}]);
}
@@ -340,6 +519,27 @@
if (build !== 'object' && build !== 'merged') {
$("#node-input-accumulate").prop("checked",false);
}
var topics = $("#node-input-topics-container").editableList('items');
var node = this;
node.topics = [];
topics.each(function(i) {
var topicData = $(this).data('data');
var topic = $(this);
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var r = {topic:value};
node.topics.push(r);
});
},
oneditresize: function(size) {
var rows = $("#dialog-form>div:not(.node-input-topics-container-row)");
var height = size.height;
for (var i=0;i<rows.size();i++) {
height -= $(rows[i]).outerHeight(true);
}
var editorRow = $("#dialog-form>div.node-input-topics-container-row");
height -= (parseInt(editorRow.css("marginTop"))+parseInt(editorRow.css("marginBottom")));
$("#node-input-topics-container").editableList('height',height);
}
});
</script>

View File

@@ -232,6 +232,229 @@ module.exports = function(RED) {
RED.nodes.registerType("split",SplitNode);
var _max_kept_msgs_count = undefined;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "joinMaxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
}
}
return _max_kept_msgs_count;
}
function add_to_topic(node, pending, topic, msg) {
var merge_on_change = node.merge_on_change;
if (!pending.hasOwnProperty(topic)) {
pending[topic] = [];
}
var topics = pending[topic];
topics.push(msg);
if (merge_on_change) {
var counts = node.topic_counts;
if (topics.length > counts[topic]) {
topics.shift();
node.pending_count--;
}
}
}
function compute_topic_counts(topics) {
var counts = {};
for (var topic of topics) {
counts[topic] = (counts.hasOwnProperty(topic) ? counts[topic] : 0) +1;
}
return counts;
}
function try_merge(node, pending, merge_on_change) {
var topics = node.topics;
var counts = node.topic_counts;
for(var topic of topics) {
if(!pending.hasOwnProperty(topic) ||
(pending[topic].length < counts[topic])) {
return;
}
}
var merge_on_change = node.merge_on_change;
var msgs = [];
var vals = [];
var new_msg = {payload: vals};
for (var topic of topics) {
var pmsgs = pending[topic];
var msg = pmsgs.shift();
if (merge_on_change) {
pmsgs.push(msg);
}
var pval = msg.payload;
var val = new_msg[topic];
msgs.push(msg);
vals.push(pval);
if (val instanceof Array) {
new_msg[topic].push(pval);
}
else if (val === undefined) {
new_msg[topic] = pval;
}
else {
new_msg[topic] = [val, pval]
}
}
node.send(new_msg);
if (!merge_on_change) {
node.pending_count -= topics.length;
}
}
function merge_msg(node, msg) {
var topics = node.topics;
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
var pending = node.pending;
if(node.topic_counts == undefined) {
node.topic_counts = compute_topic_counts(topics)
}
add_to_topic(node, pending, topic, msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
try_merge(node, pending);
}
}
function apply_r(exp, accum, msg, index, count) {
exp.assign("I", index);
exp.assign("N", count);
exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, msg);
}
function apply_f(exp, accum, count) {
exp.assign("N", count);
exp.assign("A", accum);
return RED.util.evaluateJSONataExpression(exp, {});
}
function exp_or_undefined(exp) {
if((exp === "") ||
(exp === null)) {
return undefined;
}
return exp
}
function reduce_and_send_group(node, group) {
var is_right = node.reduce_right;
var flag = is_right ? -1 : 1;
var msgs = group.msgs;
var accum = node.reduce_init;
var reduce_exp = node.reduce_exp;
var reduce_fixup = node.reduce_fixup;
var count = group.count;
msgs.sort(function(x,y) {
var ix = x.parts.index;
var iy = y.parts.index;
if (ix < iy) return -flag;
if (ix > iy) return flag;
return 0;
});
for(var msg of msgs) {
accum = apply_r(reduce_exp, accum, msg, msg.parts.index, count);
}
if(reduce_fixup !== undefined) {
accum = apply_f(reduce_fixup, accum, count);
}
node.send({payload: accum});
}
function reduce_msg(node, msg) {
if(msg.hasOwnProperty('parts')) {
var parts = msg.parts;
var pending = node.pending;
var pending_count = node.pending_count;
var gid = msg.parts.id;
if(!pending.hasOwnProperty(gid)) {
var count = undefined;
if(parts.hasOwnProperty('count')) {
count = msg.parts.count;
}
pending[gid] = {
count: count,
msgs: []
};
}
var group = pending[gid];
var msgs = group.msgs;
if(parts.hasOwnProperty('count') &&
(group.count === undefined)) {
group.count = count;
}
msgs.push(msg);
pending_count++;
if(msgs.length === group.count) {
delete pending[gid];
try {
pending_count -= msgs.length;
reduce_and_send_group(node, group);
} catch(e) {
node.error(RED._("join.errors.invalid-expr",{error:e.message})); }
}
node.pending_count = pending_count;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("join.too-many"), msg);
}
}
else {
node.send(msg);
}
}
function eval_exp(node, exp, exp_type) {
if(exp_type === "flow") {
return node.context().flow.get(exp);
}
else if(exp_type === "global") {
return node.context().global.get(exp);
}
else if(exp_type === "str") {
return exp;
}
else if(exp_type === "num") {
return Number(exp);
}
else if(exp_type === "bool") {
if (exp === 'true') {
return true;
}
else if (exp === 'false') {
return false;
}
}
else if ((exp_type === "bin") ||
(exp_type === "json")) {
return JSON.parse(exp);
}
else if(exp_type === "date") {
return Date.now();
}
else if(exp_type === "jsonata") {
var jexp = RED.util.prepareJSONataExpression(exp, node);
return RED.util.evaluateJSONataExpression(jexp, {});
}
throw new Error("unexpected initial value type");
}
function JoinNode(n) {
RED.nodes.createNode(this,n);
this.mode = n.mode||"auto";
@@ -246,6 +469,22 @@ module.exports = function(RED) {
this.joiner = n.joiner||"";
this.joinerType = n.joinerType||"str";
this.reduce = (this.mode === "reduce");
if (this.reduce) {
var exp_init = n.reduceInit;
var exp_init_type = n.reduceInitType;
var exp_reduce = n.reduceExp;
var exp_fixup = exp_or_undefined(n.reduceFixup);
this.reduce_right = n.reduceRight;
try {
this.reduce_init = eval_exp(this, exp_init, exp_init_type);
this.reduce_exp = RED.util.prepareJSONataExpression(exp_reduce, this);
this.reduce_fixup = (exp_fixup !== undefined) ? RED.util.prepareJSONataExpression(exp_fixup, this) : undefined;
} catch(e) {
this.error(RED._("join.errors.invalid-expr",{error:e.message}));
}
}
if (this.joinerType === "str") {
this.joiner = this.joiner.replace(/\\n/g,"\n").replace(/\\r/g,"\r").replace(/\\t/g,"\t").replace(/\\e/g,"\e").replace(/\\f/g,"\f").replace(/\\0/g,"\0");
} else if (this.joinerType === "bin") {
@@ -259,6 +498,14 @@ module.exports = function(RED) {
this.build = n.build || "array";
this.accumulate = n.accumulate || "false";
this.topics = (n.topics || []).map(function(x) { return x.topic; });
this.merge_on_change = n.mergeOnChange || false;
this.topic_counts = undefined;
this.output = n.output || "stream";
this.pending = {};
this.pending_count = 0;
//this.topic = n.topic;
var node = this;
var inflight = {};
@@ -321,6 +568,10 @@ module.exports = function(RED) {
node.warn("Message missing msg.parts property - cannot join in 'auto' mode")
return;
}
if (node.mode === 'merge' && !msg.hasOwnProperty("topic")) {
node.warn("Message missing msg.topic property - cannot join in 'merge' mode");
return;
}
if (node.propertyType == "full") {
property = msg;
@@ -351,6 +602,14 @@ module.exports = function(RED) {
arrayLen = msg.parts.len;
propertyIndex = msg.parts.index;
}
else if (node.mode === 'merge') {
merge_msg(node, msg);
return;
}
else if (node.mode === 'reduce') {
reduce_msg(node, msg);
return;
}
else {
// Use the node configuration to identify all of the group information
partId = "_";

View File

@@ -19,17 +19,24 @@
<script type="text/x-red" data-template-name="sort">
<div class="form-row">
<label><i class="fa fa-dot-circle-o"></i> <span data-i18n="sort.key-type"></span></label>
<select id="node-input-keyType" style="width:200px;">
<option value="payload" data-i18n="sort.payload"></option>
<option value="exp" data-i18n="sort.exp"></option>
</select>
<label for="node-input-target"><i class="fa fa-dot-circle-o"></i> <span data-i18n="sort.target"></span></label>
<input type="text" id="node-input-target" style="width:70%;">
<input type="hidden" id="node-input-targetType">
</div>
<div class="node-row-sort-key">
<div class="node-row-sort-msg-key">
<div class="form-row">
<label><i class="fa fa-filter"></i> <span data-i18n="sort.key-exp"></span></label>
<input type="text" id="node-input-key" style="width:70%;">
<label for="node-input-msgKey"><i class="fa fa-filter"></i> <span data-i18n="sort.key"></span></label>
<input type="text" id="node-input-msgKey" style="width:70%;">
<input type="hidden" id="node-input-msgKeyType">
</div>
</div>
<div class="node-row-sort-seq-key">
<div class="form-row">
<label for="node-input-seqKey"><i class="fa fa-filter"></i> <span data-i18n="sort.key"></span></label>
<input type="text" id="node-input-seqKey" style="width:70%;">
<input type="hidden" id="node-input-seqKeyType">
</div>
</div>
@@ -54,17 +61,17 @@
</script>
<script type="text/x-red" data-help-name="sort">
<p>A function that sorts a sequence of messages or payload of array type.</p>
<p>When paired with the <b>split</b> node, it will reorder the
messages.</p>
<p>A function that sorts message property or a sequence of messages.</p>
<p>When configured to sort message property, the node sorts array data pointed to by specified message property.</p>
<p>When configured to sort a sequence of messages, it will reorder the messages.</p>
<p>The sorting order can be:</p>
<ul>
<li><b>ascending</b>,</li>
<li><b>descending</b>.</li>
</ul>
<p>For numbers, numerical ordering can be specified by a checkbox.</p>
<p>Sort key can be <code>payload</code> or any JSONata expression for sorting messages, element value or any JSONata expression for sorting payload of array type.</p>
<p>The sort node relies on the received messages to have <code>msg.parts</code> set for sorting messages. The split node generates this property, but can be manually created. It has the following properties:</p>
<p>Sort key can be element value or JSONata expression for sorting property value, or message property or JSONata expression for sorting a message sequence.<p>
<p>When sorting a message sequence, the sort node relies on the received messages to have <code>msg.parts</code> set. The split node generates this property, but can be manually created. It has the following properties:</p>
<p>
<ul>
<li><code>id</code> - an identifier for the group of messages</li>
@@ -74,7 +81,7 @@
</p>
<p><b>Note:</b> This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified. Default is no limit on number of messages.
<ul>
<li><code>sortMaxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
<li><code>maxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
</ul>
</p>
</script>
@@ -86,9 +93,13 @@
defaults: {
name: { value:"" },
order: { value:"ascending" },
as_num : { value:false },
keyType : { value:"payload" },
key : { value:"" }
as_num: { value:false },
target: { value:"payload" },
targetType: { value:"msg" },
msgKey: { value:"payload" },
msgKeyType: { value:"elem" },
seqKey: { value:"payload" },
seqKeyType: { value:"msg" }
},
inputs:1,
outputs:1,
@@ -100,13 +111,37 @@
return this.name ? "node_label_italic" : "";
},
oneditprepare: function() {
$("#node-input-key").typedInput({default:'jsonata', types:['jsonata']});
$("#node-input-keyType").change(function(e) {
var val = $(this).val();
$(".node-row-sort-key").toggle(val === 'exp');
var seq = {
value: "seq",
label: RED._("node-red:sort.seq"),
hasValue: false
};
var elem = {
value: "elem",
label: RED._("node-red:sort.elem"),
hasValue: false
};
$("#node-input-target").typedInput({
default:'msg',
typeField: $("#node-input-targetType"),
types:['msg', seq]
});
$("#node-input-keyType").change();
$("#node-input-order").change();
$("#node-input-msgKey").typedInput({
default:'elem',
typeField: $("#node-input-msgKeyType"),
types:[elem, 'jsonata']
});
$("#node-input-seqKey").typedInput({
default:'msg',
typeField: $("#node-input-seqKeyType"),
types:['msg', 'jsonata']
});
$("#node-input-target").change(function(e) {
var val = $("#node-input-target").typedInput('type');
$(".node-row-sort-msg-key").toggle(val === "msg");
$(".node-row-sort-seq-key").toggle(val === "seq");
});
$("#node-input-target").change();
}
});
</script>

View File

@@ -21,7 +21,7 @@ module.exports = function(RED) {
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "sortMaxKeptMsgsCount";
var name = "maxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
@@ -60,11 +60,15 @@ module.exports = function(RED) {
var pending_id = 0;
var order = n.order || "ascending";
var as_num = n.as_num || false;
var key_is_payload = (n.keyType === 'payload');
var key_exp = undefined;
if (!key_is_payload) {
var target_prop = n.target || "payload";
var target_is_prop = (n.targetType === 'msg');
var key_is_exp = target_is_prop ? (n.msgKeyType === "jsonata") : (n.seqKeyType === "jsonata");
var key_prop = n.seqKey || "payload";
var key_exp = target_is_prop ? n.msgKey : n.seqKey;
if (key_is_exp) {
try {
key_exp = RED.util.prepareJSONataExpression(n.key, this);
key_exp = RED.util.prepareJSONataExpression(key_exp, this);
}
catch (e) {
node.error(RED._("sort.invalid-exp"));
@@ -87,10 +91,12 @@ module.exports = function(RED) {
}
function send_group(group) {
var key = key_is_payload
? function(msg) { return msg.payload; }
: function(msg) {
var key = key_is_exp
? function(msg) {
return eval_jsonata(node, key_exp, msg);
}
: function(msg) {
return RED.util.getMessageProperty(msg, key_prop);
};
var comp = gen_comp(key);
var msgs = group.msgs;
@@ -108,16 +114,16 @@ module.exports = function(RED) {
}
function sort_payload(msg) {
var payload = msg.payload;
if (Array.isArray(payload)) {
var key = key_is_payload
? function(elem) { return elem; }
: function(elem) {
var data = RED.util.getMessageProperty(msg, target_prop);
if (Array.isArray(data)) {
var key = key_is_exp
? function(elem) {
return eval_jsonata(node, key_exp, elem);
};
}
: function(elem) { return elem; };
var comp = gen_comp(key);
try {
payload.sort(comp);
data.sort(comp);
}
catch (e) {
return false;
@@ -162,7 +168,7 @@ module.exports = function(RED) {
}
function process_msg(msg) {
if (!msg.hasOwnProperty("parts")) {
if (target_is_prop) {
if (sort_payload(msg)) {
node.send(msg);
}

View File

@@ -0,0 +1,192 @@
<!--
Copyright JS Foundation and other contributors, http://js.foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<script type="text/x-red" data-template-name="batch">
<div class="form-row">
<label for="node-input-mode"><i class="fa fa-dot-circle-o"></i> <span data-i18n="batch.mode.label"></span></label>
<select type="text" id="node-input-mode" style="width: 300px;">
<option value="count" data-i18n="batch.mode.num-msgs"></option>
<option value="interval" data-i18n="batch.mode.interval"></option>
<option value="concat" data-i18n="batch.mode.concat"></option>
</select>
</div>
<div class="node-row-msg-count">
<div class="form-row node-row-count">
<label for="node-input-count" data-i18n="batch.count.label"></label>
<input type="text" id="node-input-count" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
</div>
</div>
<div class="node-row-msg-overwrap">
<div class="form-row node-row-overwrap">
<label for="node-input-count" data-i18n="batch.count.overwrap"></label>
<input type="text" id="node-input-overwrap" data-i18n="[placeholder]batch.count.count" style="width: 50px;">
</div>
</div>
<div class="node-row-msg-interval">
<div class="form-row node-row-interval">
<label for="node-input-interval"> <span data-i18n="batch.interval.label"></span></label>
<input type="text" id="node-input-interval" data-i18n="[placeholder]batch.interval.seconds" style="width: 50px;">
<span data-i18n="batch.interval.sec"></span>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-allowEmptySequence" style="margin-left:10px; vertical-align:top; width:auto;">
<label for="node-input-allowEmptySequence" style="width:auto;" data-i18n="batch.interval.empty"></label>
</div>
</div>
<div class="node-row-msg-concat">
<div class="form-row">
<label data-i18n="batch.concat.topics-label"></label>
<div class="form-row node-input-topics-container-row">
<ol id="node-input-topics-container"></ol>
</div>
</div>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="node-red:common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]node-red:common.label.name">
</div>
</script>
<script type="text/x-red" data-help-name="batch">
<p>A function that divides input messages into multiple sequences of messages or concatenates multiple sequences of messages into a single messages sequence.</p>
<h3>Details</h3>
<h4>group by number of messages</h4>
<p>groups incoming messages into sequences of messages with specified counts. The output message group can be overwrapped.</p>
<h4>group by interval in seconds</h4>
<p>groups incoming messages received withn specified interval into sequences of messages. </p>
<h4>concatenate message groups</h4>
<p>creates a message sequence based on <code>topic</code> value of incoming message sequences.</p>
<p>Target and order of concatenated sequences are specified by <code>Topics</code> value. Selection of concatenated message groups is based on arrival of first message of the group.</p>
<p><b>Note:</b> This node internally keeps messages for its operation. In order to prevent unexpected memory usage, maximum number of messages kept can be specified. Default is no limit on number of messages.
<ul>
<li><code>batchMaxKeptMsgsCount</code> property set in <b>settings.js</b>.</li>
</ul>
</p>
</script>
<script type="text/javascript">
RED.nodes.registerType("batch",{
category: "function",
color:"#E2D96E",
defaults: {
name: {value:""},
mode: {value:"count"},
count: {value:10},
overwrap: {value:0},
interval: {value:10},
allowEmptySequence: {value:false},
topics: {value:[{topic:""}]}
},
inputs:1,
outputs:1,
icon: "batch.png",
label: function() {
return this.name || "batch";
},
labelStyle: function() {
return this.name ? "node_label_italic" : "";
},
oneditprepare: function() {
var node = this;
var topic_str = node._("batch.concat.topic");
function resizeTopics(topic) {
var newWidth = topic.width();
topic.find('.red-ui-typedInput')
.typedInput("width",newWidth-15);
}
$("#node-input-topics-container")
.css('min-height','200px').css('min-width','430px')
.editableList({
addItem: function(container, i, opt) {
if (!opt.hasOwnProperty('topic')) {
opt.topic = "";
}
var row = $('<div/>').appendTo(container);
var valueField = $('<input/>',{
class:"node-input-topic-value",
type:"text",
style:"margin-left: 5px;"
}).appendTo(row)
.typedInput({default:'str', types:['str']});
valueField.typedInput('value', opt.topic);
valueField.typedInput('type', 'str');
valueField.attr('placeholder', topic_str);
resizeTopics(container);
},
resizeItem: resizeTopics,
sortable: true,
removable: true
});
$("#node-input-count").spinner({
});
$("#node-input-overwrap").spinner({
});
$("#node-input-interval").spinner({
});
$("#node-input-mode").change(function(e) {
var val = $(this).val();
$(".node-row-msg-count").toggle(val==="count");
$(".node-row-msg-overwrap").toggle(val==="count");
$(".node-row-msg-interval").toggle(val==="interval");
$(".node-row-msg-concat").toggle(val==="concat");
if (val==="concat") {
var topics = node.topics;
var container = $("#node-input-topics-container");
container.editableList('empty');
for (var i = 0; i < topics.length; i++) {
var topic = topics[i];
container.editableList('addItem', topic);
}
}
});
},
oneditsave: function() {
var topics = $("#node-input-topics-container").editableList('items');
var node = this;
node.topics = [];
topics.each(function(i) {
var topicData = $(this).data('data');
var topic = $(this);
var vf = topic.find(".node-input-topic-value");
var value = vf.typedInput('value');
var type = vf.typedInput('type');
var r = {topic:value};
node.topics.push(r);
});
},
oneditresize: function(size) {
var rows = $("#dialog-form>div:not(.node-input-topics-container-row)");
var height = size.height;
for (var i = 0; i < rows.size(); i++) {
height -= $(rows[i]).outerHeight(true);
}
var editorRow = $("#dialog-form>div.node-input-topics-container-row");
height -= (parseInt(editorRow.css("marginTop"))+parseInt(editorRow.css("marginBottom")));
$("#node-input-topics-container").editableList('height',height);
}
});
</script>

View File

@@ -0,0 +1,246 @@
/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
module.exports = function(RED) {
"use strict";
var _max_kept_msgs_count = undefined;
function max_kept_msgs_count(node) {
if (_max_kept_msgs_count === undefined) {
var name = "batchMaxKeptMsgsCount";
if (RED.settings.hasOwnProperty(name)) {
_max_kept_msgs_count = RED.settings[name];
}
else {
_max_kept_msgs_count = 0;
}
}
return _max_kept_msgs_count;
}
function send_msgs(node, msgs, clone_msg) {
var count = msgs.length;
var msg_id = msgs[0]._msgid;
for (var i = 0; i < count; i++) {
var msg = clone_msg ? RED.util.cloneMessage(msgs[i]) : msgs[i];
if (!msg.hasOwnProperty("parts")) {
msg.parts = {};
}
var parts = msg.parts;
parts.id = msg_id;
parts.index = i;
parts.count = count;
node.send(msg);
}
}
function send_interval(node, allow_empty_seq) {
let msgs = node.pending;
if (msgs.length > 0) {
send_msgs(node, msgs, false);
node.pending = [];
}
else {
if (allow_empty_seq) {
let mid = RED.util.generateId();
let msg = {
payload: null,
parts: {
id: mid,
index: 0,
count: 1
}
};
node.send(msg);
}
}
}
function is_complete(pending, topic) {
if (pending.hasOwnProperty(topic)) {
var p_topic = pending[topic];
var gids = p_topic.gids;
if (gids.length > 0) {
var gid = gids[0];
var groups = p_topic.groups;
var group = groups[gid];
return (group.count === group.msgs.length);
}
}
return false;
}
function get_msgs_of_topic(pending, topic) {
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
var gid = gids[0];
var group = groups[gid];
return group.msgs;
}
function remove_topic(pending, topic) {
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
var gid = gids.shift();
delete groups[gid];
}
function try_concat(node, pending) {
var topics = node.topics;
for (var topic of topics) {
if (!is_complete(pending, topic)) {
return;
}
}
var msgs = [];
for (var topic of topics) {
var t_msgs = get_msgs_of_topic(pending, topic);
msgs = msgs.concat(t_msgs);
}
for (var topic of topics) {
remove_topic(pending, topic);
}
send_msgs(node, msgs, false);
node.pending_count -= msgs.length;
}
function add_to_topic_group(pending, topic, gid, msg) {
if (!pending.hasOwnProperty(topic)) {
pending[topic] = { groups: {}, gids: [] };
}
var p_topic = pending[topic];
var groups = p_topic.groups;
var gids = p_topic.gids;
if (!groups.hasOwnProperty(gid)) {
groups[gid] = { msgs: [], count: undefined };
gids.push(gid);
}
var group = groups[gid];
group.msgs.push(msg);
if ((group.count === undefined) &&
msg.parts.hasOwnProperty('count')) {
group.count = msg.parts.count;
}
}
function concat_msg(node, msg) {
var topic = msg.topic;
if(node.topics.indexOf(topic) >= 0) {
if (!msg.hasOwnProperty("parts") ||
!msg.parts.hasOwnProperty("id") ||
!msg.parts.hasOwnProperty("index") ||
!msg.parts.hasOwnProperty("count")) {
node.error(RED._("batch.no-parts"), msg);
return;
}
var gid = msg.parts.id;
var pending = node.pending;
add_to_topic_group(pending, topic, gid, msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = {};
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
try_concat(node, pending);
}
}
function BatchNode(n) {
RED.nodes.createNode(this,n);
var node = this;
var mode = n.mode || "count";
node.pending_count = 0;
if (mode === "count") {
var count = Number(n.count || 1);
var overwrap = Number(n.overwrap || 0);
var is_overwrap = (overwrap > 0);
if (count <= overwrap) {
node.error(RED._("batch.count.invalid"));
return;
}
node.pending = [];
this.on("input", function(msg) {
var queue = node.pending;
queue.push(msg);
node.pending_count++;
if (queue.length === count) {
send_msgs(node, queue, is_overwrap);
node.pending =
(overwrap === 0) ? [] : queue.slice(-overwrap);
node.pending_count = 0;
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
node.pending_count = 0;
node.pending = [];
});
}
else if (mode === "interval") {
var interval = Number(n.interval || "0") *1000;
var allow_empty_seq = n.allowEmptySequence;
node.pending = []
var timer = setInterval(function() {
send_interval(node, allow_empty_seq);
node.pending_count = 0;
}, interval);
this.on("input", function(msg) {
node.pending.push(msg);
node.pending_count++;
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (node.pending_count > max_msgs)) {
node.pending = [];
node.pending_count = 0;
node.error(RED._("batch.too-many"), msg);
}
});
this.on("close", function() {
clearInterval(timer);
node.pending = [];
node.pending_count = 0;
});
}
else if(mode === "concat") {
node.topics = (n.topics || []).map(function(x) {
return x.topic;
});
node.pending = {};
this.on("input", function(msg) {
concat_msg(node, msg);
});
this.on("close", function() {
node.pending = {};
node.pending_count = 0;
});
}
else {
node.error(RED._("batch.unexpected"));
}
}
RED.nodes.registerType("batch", BatchNode);
}