2017-12-05 15:54:03 +01:00
|
|
|
/**
|
|
|
|
* Copyright JS Foundation and other contributors, http://js.foundation
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
**/
|
|
|
|
|
|
|
|
module.exports = function(RED) {
|
|
|
|
"use strict";
|
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
var _max_kept_msgs_count;
|
2017-12-05 15:54:03 +01:00
|
|
|
|
|
|
|
function max_kept_msgs_count(node) {
|
|
|
|
if (_max_kept_msgs_count === undefined) {
|
2018-01-24 23:01:07 +01:00
|
|
|
var name = "nodeMessageBufferMaxLength";
|
2017-12-05 15:54:03 +01:00
|
|
|
if (RED.settings.hasOwnProperty(name)) {
|
|
|
|
_max_kept_msgs_count = RED.settings[name];
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
_max_kept_msgs_count = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return _max_kept_msgs_count;
|
|
|
|
}
|
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
// function get_context_val(node, name, dval) {
|
|
|
|
// var context = node.context();
|
|
|
|
// var val = context.get(name);
|
|
|
|
// if (val === undefined) {
|
|
|
|
// context.set(name, dval);
|
|
|
|
// return dval;
|
|
|
|
// }
|
|
|
|
// return val;
|
|
|
|
// }
|
2017-12-05 15:54:03 +01:00
|
|
|
|
|
|
|
function SortNode(n) {
|
|
|
|
RED.nodes.createNode(this, n);
|
|
|
|
var node = this;
|
2018-07-10 00:06:51 +02:00
|
|
|
var pending = {};//get_context_val(node, 'pending', {})
|
2017-12-05 15:54:03 +01:00
|
|
|
var pending_count = 0;
|
2017-12-06 20:44:46 +01:00
|
|
|
var pending_id = 0;
|
2017-12-05 15:54:03 +01:00
|
|
|
var order = n.order || "ascending";
|
|
|
|
var as_num = n.as_num || false;
|
2018-01-22 01:23:22 +01:00
|
|
|
var target_prop = n.target || "payload";
|
|
|
|
var target_is_prop = (n.targetType === 'msg');
|
|
|
|
var key_is_exp = target_is_prop ? (n.msgKeyType === "jsonata") : (n.seqKeyType === "jsonata");
|
|
|
|
var key_prop = n.seqKey || "payload";
|
|
|
|
var key_exp = target_is_prop ? n.msgKey : n.seqKey;
|
|
|
|
|
|
|
|
if (key_is_exp) {
|
2017-12-05 15:54:03 +01:00
|
|
|
try {
|
2018-01-22 01:23:22 +01:00
|
|
|
key_exp = RED.util.prepareJSONataExpression(key_exp, this);
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
|
|
|
catch (e) {
|
2018-07-10 18:26:54 +02:00
|
|
|
node.error(RED._("sort.invalid-exp",{message:e.toString()}));
|
2017-12-05 15:54:03 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var dir = (order === "ascending") ? 1 : -1;
|
2018-07-10 00:06:51 +02:00
|
|
|
var conv = as_num ? function(x) { return Number(x); }
|
|
|
|
: function(x) { return x; };
|
2017-12-05 15:54:03 +01:00
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
function generateComparisonFunction(key) {
|
2017-12-05 15:54:03 +01:00
|
|
|
return function(x, y) {
|
|
|
|
var xp = conv(key(x));
|
|
|
|
var yp = conv(key(y));
|
2017-12-05 16:01:58 +01:00
|
|
|
if (xp === yp) { return 0; }
|
|
|
|
if (xp > yp) { return dir; }
|
2017-12-05 15:54:03 +01:00
|
|
|
return -dir;
|
|
|
|
};
|
2017-12-05 16:01:58 +01:00
|
|
|
}
|
2017-12-05 15:54:03 +01:00
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
function sortMessageGroup(group) {
|
|
|
|
var promise;
|
2020-11-02 05:31:27 +01:00
|
|
|
var msgInfos = group.msgInfos;
|
2018-07-10 00:06:51 +02:00
|
|
|
if (key_is_exp) {
|
2020-11-02 05:31:27 +01:00
|
|
|
var evaluatedDataPromises = msgInfos.map(mInfo => {
|
2018-07-10 00:06:51 +02:00
|
|
|
return new Promise((resolve,reject) => {
|
2020-11-02 05:31:27 +01:00
|
|
|
RED.util.evaluateJSONataExpression(key_exp, mInfo.msg, (err, result) => {
|
2018-07-10 18:26:54 +02:00
|
|
|
if (err) {
|
|
|
|
reject(RED._("sort.invalid-exp",{message:err.toString()}));
|
|
|
|
} else {
|
|
|
|
resolve({
|
2020-11-02 05:31:27 +01:00
|
|
|
item: mInfo,
|
2018-07-10 18:26:54 +02:00
|
|
|
sortValue: result
|
|
|
|
})
|
|
|
|
}
|
2018-07-10 00:06:51 +02:00
|
|
|
});
|
|
|
|
})
|
|
|
|
});
|
|
|
|
promise = Promise.all(evaluatedDataPromises).then(evaluatedElements => {
|
|
|
|
// Once all of the sort keys are evaluated, sort by them
|
|
|
|
var comp = generateComparisonFunction(elem=>elem.sortValue);
|
|
|
|
return evaluatedElements.sort(comp).map(elem=>elem.item);
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
var key = function(msg) {
|
|
|
|
return ;
|
|
|
|
}
|
2020-11-02 05:31:27 +01:00
|
|
|
var comp = generateComparisonFunction(mInfo => RED.util.getMessageProperty(mInfo.msg, key_prop));
|
2017-12-05 15:54:03 +01:00
|
|
|
try {
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfos.sort(comp);
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
|
|
|
catch (e) {
|
2018-07-10 00:06:51 +02:00
|
|
|
return; // not send when error
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
2020-11-02 05:31:27 +01:00
|
|
|
promise = Promise.resolve(msgInfos);
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
2020-11-02 05:31:27 +01:00
|
|
|
return promise.then(msgInfos => {
|
|
|
|
for (let i = 0; i < msgInfos.length; i++) {
|
|
|
|
const msg = msgInfos[i].msg;
|
2018-07-10 00:06:51 +02:00
|
|
|
msg.parts.index = i;
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfos[i].send(msg);
|
|
|
|
msgInfos[i].done();
|
2018-07-10 00:06:51 +02:00
|
|
|
}
|
|
|
|
});
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
2017-12-05 16:01:58 +01:00
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
function sortMessageProperty(msg) {
|
|
|
|
var data = RED.util.getMessageProperty(msg, target_prop);
|
|
|
|
if (Array.isArray(data)) {
|
|
|
|
if (key_is_exp) {
|
|
|
|
// key is an expression. Evaluated the expression for each item
|
|
|
|
// to get its sort value. As this could be async, need to do
|
|
|
|
// it first.
|
|
|
|
var evaluatedDataPromises = data.map(elem => {
|
|
|
|
return new Promise((resolve,reject) => {
|
|
|
|
RED.util.evaluateJSONataExpression(key_exp, elem, (err, result) => {
|
2018-07-10 18:26:54 +02:00
|
|
|
if (err) {
|
|
|
|
reject(RED._("sort.invalid-exp",{message:err.toString()}));
|
|
|
|
} else {
|
|
|
|
resolve({
|
|
|
|
item: elem,
|
|
|
|
sortValue: result
|
|
|
|
})
|
|
|
|
}
|
2018-07-10 00:06:51 +02:00
|
|
|
});
|
|
|
|
})
|
|
|
|
})
|
|
|
|
return Promise.all(evaluatedDataPromises).then(evaluatedElements => {
|
|
|
|
// Once all of the sort keys are evaluated, sort by them
|
|
|
|
// and reconstruct the original message item with the newly
|
|
|
|
// sorted values.
|
|
|
|
var comp = generateComparisonFunction(elem=>elem.sortValue);
|
|
|
|
data = evaluatedElements.sort(comp).map(elem=>elem.item);
|
|
|
|
RED.util.setMessageProperty(msg, target_prop,data);
|
|
|
|
return true;
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
var comp = generateComparisonFunction(elem=>elem);
|
|
|
|
try {
|
|
|
|
data.sort(comp);
|
|
|
|
} catch (e) {
|
|
|
|
return Promise.resolve(false);
|
|
|
|
}
|
|
|
|
return Promise.resolve(true);
|
|
|
|
}
|
2017-12-06 20:44:46 +01:00
|
|
|
}
|
2018-07-10 00:06:51 +02:00
|
|
|
return Promise.resolve(false);
|
2017-12-06 20:44:46 +01:00
|
|
|
}
|
|
|
|
|
2018-07-10 00:06:51 +02:00
|
|
|
function removeOldestPending() {
|
|
|
|
var oldest;
|
|
|
|
var oldest_key;
|
2017-12-06 20:44:46 +01:00
|
|
|
for(var key in pending) {
|
2018-07-10 00:06:51 +02:00
|
|
|
if (pending.hasOwnProperty(key)) {
|
|
|
|
var item = pending[key];
|
|
|
|
if((oldest === undefined) ||
|
|
|
|
(oldest.seq_no > item.seq_no)) {
|
|
|
|
oldest = item;
|
|
|
|
oldest_key = key;
|
|
|
|
}
|
2017-12-06 20:44:46 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if(oldest !== undefined) {
|
2020-11-02 05:31:27 +01:00
|
|
|
oldest.msgInfos[oldest.msgInfos.length - 1].done(RED._("sort.too-many"));
|
|
|
|
for (let i = 0; i < oldest.msgInfos.length - 1; i++) {
|
|
|
|
oldest.msgInfos[i].done();
|
|
|
|
}
|
2017-12-06 20:44:46 +01:00
|
|
|
delete pending[oldest_key];
|
2020-11-02 05:31:27 +01:00
|
|
|
return oldest.msgInfos.length;
|
2017-12-06 20:44:46 +01:00
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
2018-07-10 00:06:51 +02:00
|
|
|
|
2020-11-02 05:31:27 +01:00
|
|
|
function processMessage(msgInfo) {
|
|
|
|
const msg = msgInfo.msg;
|
2018-01-22 01:23:22 +01:00
|
|
|
if (target_is_prop) {
|
2018-07-10 00:06:51 +02:00
|
|
|
sortMessageProperty(msg).then(send => {
|
|
|
|
if (send) {
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfo.send(msg);
|
2018-07-10 00:06:51 +02:00
|
|
|
}
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfo.done();
|
2018-07-10 00:06:51 +02:00
|
|
|
}).catch(err => {
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfo.done(err);
|
2018-07-10 00:06:51 +02:00
|
|
|
});
|
2018-09-05 15:29:20 +02:00
|
|
|
return;
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
|
|
|
var parts = msg.parts;
|
2018-09-05 15:29:20 +02:00
|
|
|
if (!parts || !parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) {
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfo.done();
|
2017-12-05 15:54:03 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
var gid = parts.id;
|
|
|
|
if (!pending.hasOwnProperty(gid)) {
|
|
|
|
pending[gid] = {
|
|
|
|
count: undefined,
|
2020-11-02 05:31:27 +01:00
|
|
|
msgInfos: [],
|
2017-12-06 20:44:46 +01:00
|
|
|
seq_no: pending_id++
|
2017-12-05 15:54:03 +01:00
|
|
|
};
|
|
|
|
}
|
|
|
|
var group = pending[gid];
|
2020-11-02 05:31:27 +01:00
|
|
|
var msgInfos = group.msgInfos;
|
|
|
|
msgInfos.push(msgInfo);
|
2017-12-05 15:54:03 +01:00
|
|
|
if (parts.hasOwnProperty("count")) {
|
|
|
|
group.count = parts.count;
|
|
|
|
}
|
|
|
|
pending_count++;
|
2020-11-02 05:31:27 +01:00
|
|
|
if (group.count === msgInfos.length) {
|
2017-12-05 15:54:03 +01:00
|
|
|
delete pending[gid]
|
2018-07-10 18:26:54 +02:00
|
|
|
sortMessageGroup(group).catch(err => {
|
2020-11-02 05:31:27 +01:00
|
|
|
// throw an error for last message, and just call done() for remaining messages
|
|
|
|
msgInfos[msgInfos.length-1].done(err);
|
|
|
|
for (let i = 0; i < msgInfos.length - 1; i++) {
|
|
|
|
msgInfos[i].done()
|
|
|
|
};
|
2018-07-10 18:26:54 +02:00
|
|
|
});
|
2020-11-02 05:31:27 +01:00
|
|
|
pending_count -= msgInfos.length;
|
2018-07-10 00:06:51 +02:00
|
|
|
} else {
|
|
|
|
var max_msgs = max_kept_msgs_count(node);
|
|
|
|
if ((max_msgs > 0) && (pending_count > max_msgs)) {
|
|
|
|
pending_count -= removeOldestPending();
|
|
|
|
}
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
|
|
|
}
|
2018-07-10 00:06:51 +02:00
|
|
|
|
2020-11-02 05:31:27 +01:00
|
|
|
this.on("input", function(msg, send, done) {
|
|
|
|
processMessage({msg, send, done});
|
2017-12-05 15:54:03 +01:00
|
|
|
});
|
2017-12-06 20:44:46 +01:00
|
|
|
|
|
|
|
this.on("close", function() {
|
2018-07-10 00:06:51 +02:00
|
|
|
for(var key in pending) {
|
|
|
|
if (pending.hasOwnProperty(key)) {
|
2020-11-02 05:31:27 +01:00
|
|
|
node.log(RED._("sort.clear"), pending[key].msgInfos[0]);
|
|
|
|
const group = pending[key];
|
|
|
|
group.msgInfos.forEach(mInfo => {
|
|
|
|
mInfo.done();
|
|
|
|
});
|
2018-07-10 00:06:51 +02:00
|
|
|
delete pending[key];
|
|
|
|
}
|
|
|
|
}
|
2018-09-05 15:29:20 +02:00
|
|
|
pending_count = 0;
|
|
|
|
});
|
2017-12-05 15:54:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
RED.nodes.registerType("sort", SortNode);
|
|
|
|
}
|