mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
Merge pull request #2553 from node-red-hitachi/add-reset-to-batch-node
Add reset feature to batch node
This commit is contained in:
commit
13932b2cfb
@ -179,6 +179,11 @@ module.exports = function(RED) {
|
|||||||
}
|
}
|
||||||
node.pending = [];
|
node.pending = [];
|
||||||
this.on("input", function(msg) {
|
this.on("input", function(msg) {
|
||||||
|
if (msg.hasOwnProperty("reset")) {
|
||||||
|
node.pending = [];
|
||||||
|
node.pending_count = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
var queue = node.pending;
|
var queue = node.pending;
|
||||||
queue.push(msg);
|
queue.push(msg);
|
||||||
node.pending_count++;
|
node.pending_count++;
|
||||||
@ -204,11 +209,26 @@ module.exports = function(RED) {
|
|||||||
var interval = Number(n.interval || "0") *1000;
|
var interval = Number(n.interval || "0") *1000;
|
||||||
var allow_empty_seq = n.allowEmptySequence;
|
var allow_empty_seq = n.allowEmptySequence;
|
||||||
node.pending = []
|
node.pending = []
|
||||||
var timer = setInterval(function() {
|
function msgHandler() {
|
||||||
send_interval(node, allow_empty_seq);
|
send_interval(node, allow_empty_seq);
|
||||||
node.pending_count = 0;
|
node.pending_count = 0;
|
||||||
}, interval);
|
}
|
||||||
|
var timer = undefined;
|
||||||
|
if (interval > 0) {
|
||||||
|
timer = setInterval(msgHandler, interval);
|
||||||
|
}
|
||||||
this.on("input", function(msg) {
|
this.on("input", function(msg) {
|
||||||
|
if (msg.hasOwnProperty("reset")) {
|
||||||
|
if (timer !== undefined) {
|
||||||
|
clearInterval(timer);
|
||||||
|
}
|
||||||
|
node.pending = [];
|
||||||
|
node.pending_count = 0;
|
||||||
|
if (interval > 0) {
|
||||||
|
timer = setInterval(msgHandler, interval);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
node.pending.push(msg);
|
node.pending.push(msg);
|
||||||
node.pending_count++;
|
node.pending_count++;
|
||||||
var max_msgs = max_kept_msgs_count(node);
|
var max_msgs = max_kept_msgs_count(node);
|
||||||
@ -219,7 +239,9 @@ module.exports = function(RED) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
this.on("close", function() {
|
this.on("close", function() {
|
||||||
clearInterval(timer);
|
if (timer !== undefined) {
|
||||||
|
clearInterval(timer);
|
||||||
|
}
|
||||||
node.pending = [];
|
node.pending = [];
|
||||||
node.pending_count = 0;
|
node.pending_count = 0;
|
||||||
});
|
});
|
||||||
@ -230,6 +252,11 @@ module.exports = function(RED) {
|
|||||||
});
|
});
|
||||||
node.pending = {};
|
node.pending = {};
|
||||||
this.on("input", function(msg) {
|
this.on("input", function(msg) {
|
||||||
|
if (msg.hasOwnProperty("reset")) {
|
||||||
|
node.pending = {};
|
||||||
|
node.pending_count = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
concat_msg(node, msg);
|
concat_msg(node, msg);
|
||||||
});
|
});
|
||||||
this.on("close", function() {
|
this.on("close", function() {
|
||||||
|
@ -39,4 +39,5 @@
|
|||||||
<p>This node will buffer messages internally in order to work across sequences. The
|
<p>This node will buffer messages internally in order to work across sequences. The
|
||||||
runtime setting <code>nodeMessageBufferMaxLength</code> can be used to limit how many messages nodes
|
runtime setting <code>nodeMessageBufferMaxLength</code> can be used to limit how many messages nodes
|
||||||
will buffer.</p>
|
will buffer.</p>
|
||||||
|
<p>If a message is received with the <b>msg.reset</b> property set, the buffered messages are deleted and not sent.</p>
|
||||||
</script>
|
</script>
|
||||||
|
@ -31,4 +31,6 @@
|
|||||||
</dl>
|
</dl>
|
||||||
<h4>メッセージの蓄積</h4>
|
<h4>メッセージの蓄積</h4>
|
||||||
<p>このノードの処理ではメッセージ列の処理のためメッセージを内部に蓄積します。<b>settings.js</b>の<code>nodeMessageBufferMaxLength</code>を指定することで蓄積するメッセージの最大値を制限することができます。</p>
|
<p>このノードの処理ではメッセージ列の処理のためメッセージを内部に蓄積します。<b>settings.js</b>の<code>nodeMessageBufferMaxLength</code>を指定することで蓄積するメッセージの最大値を制限することができます。</p>
|
||||||
|
<p>メッセージが<b>msg.reset</b>プロパティを持つ場合、蓄積したメッセージを削除し送信を行いません。</p>
|
||||||
|
|
||||||
</script>
|
</script>
|
||||||
|
@ -107,13 +107,18 @@ describe('BATCH node', function() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function delayed_send(receiver, index, count, delay) {
|
function delayed_send(receiver, index, count, delay, done) {
|
||||||
if (index < count) {
|
if (index < count) {
|
||||||
setTimeout(function() {
|
setTimeout(function() {
|
||||||
receiver.receive({payload: index});
|
receiver.receive({payload: index});
|
||||||
delayed_send(receiver, index+1, count, delay);
|
delayed_send(receiver, index+1, count, delay, done);
|
||||||
}, delay);
|
}, delay);
|
||||||
}
|
}
|
||||||
|
else if(index === count) {
|
||||||
|
if (done) {
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function check_interval(flow, results, delay, done) {
|
function check_interval(flow, results, delay, done) {
|
||||||
@ -198,10 +203,28 @@ describe('BATCH node', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should handle reset', function(done) {
|
||||||
|
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overlap: 0, interval: 0, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||||
|
{id:"n2", type:"helper"}];
|
||||||
|
helper.load(batchNode, flow, function() {
|
||||||
|
var n1 = helper.getNode("n1");
|
||||||
|
var n2 = helper.getNode("n2");
|
||||||
|
var results = [
|
||||||
|
[0, 1],
|
||||||
|
[4, 5]
|
||||||
|
];
|
||||||
|
check_data(n1, n2, results, done);
|
||||||
|
n1.receive({payload:0});
|
||||||
|
n1.receive({payload:1});
|
||||||
|
n1.receive({payload:2});
|
||||||
|
n1.receive({payload:3, reset: true});
|
||||||
|
n1.receive({payload:4});
|
||||||
|
n1.receive({payload:5});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('mode: interval', function() {
|
describe('mode: interval', function() {
|
||||||
|
|
||||||
it('should create seq. with interval', function(done) {
|
it('should create seq. with interval', function(done) {
|
||||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||||
{id:"n2", type:"helper"}];
|
{id:"n2", type:"helper"}];
|
||||||
@ -265,10 +288,29 @@ describe('BATCH node', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should handle reset', function(done) {
|
||||||
|
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]},
|
||||||
|
{id:"n2", type:"helper"}];
|
||||||
|
helper.load(batchNode, flow, function() {
|
||||||
|
var n1 = helper.getNode("n1");
|
||||||
|
var n2 = helper.getNode("n2");
|
||||||
|
var results = [
|
||||||
|
[0, 1],
|
||||||
|
[4, 5]
|
||||||
|
];
|
||||||
|
check_data(n1, n2, results, done);
|
||||||
|
delayed_send(n1, 0, 3, 400, function () {
|
||||||
|
setTimeout(function () {
|
||||||
|
n1.receive({payload: "3", reset: true});
|
||||||
|
delayed_send(n1, 4, 7, 400);
|
||||||
|
}, 10);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('mode: concat', function() {
|
describe('mode: concat', function() {
|
||||||
|
|
||||||
it('should concat two seq. (series)', function(done) {
|
it('should concat two seq. (series)', function(done) {
|
||||||
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
||||||
{id:"n2", type:"helper"}];
|
{id:"n2", type:"helper"}];
|
||||||
@ -355,6 +397,58 @@ describe('BATCH node', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should handle reset', function(done) {
|
||||||
|
var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]},
|
||||||
|
{id:"n2", type:"helper"}];
|
||||||
|
try {
|
||||||
|
helper.load(batchNode, flow, function() {
|
||||||
|
var n1 = helper.getNode("n1");
|
||||||
|
var n2 = helper.getNode("n2");
|
||||||
|
var results = [
|
||||||
|
[2, 3, 0, 1]
|
||||||
|
];
|
||||||
|
check_data(n1, n2, results, done);
|
||||||
|
var inputs0 = [
|
||||||
|
["TB", 0, 0, 2],
|
||||||
|
["TA", 1, 0, 2],
|
||||||
|
];
|
||||||
|
for(var data of inputs0) {
|
||||||
|
var msg = {
|
||||||
|
topic: data[0],
|
||||||
|
payload: data[1],
|
||||||
|
parts: {
|
||||||
|
id: data[0],
|
||||||
|
index: data[2],
|
||||||
|
count: data[3]
|
||||||
|
}
|
||||||
|
};
|
||||||
|
n1.receive(msg);
|
||||||
|
}
|
||||||
|
n1.receive({payload: undefined, reset: true});
|
||||||
|
var inputs1 = [
|
||||||
|
["TB", 0, 0, 2],
|
||||||
|
["TB", 1, 1, 2],
|
||||||
|
["TA", 2, 0, 2],
|
||||||
|
["TA", 3, 1, 2]
|
||||||
|
];
|
||||||
|
for(var data of inputs1) {
|
||||||
|
var msg = {
|
||||||
|
topic: data[0],
|
||||||
|
payload: data[1],
|
||||||
|
parts: {
|
||||||
|
id: data[0],
|
||||||
|
index: data[2],
|
||||||
|
count: data[3]
|
||||||
|
}
|
||||||
|
};
|
||||||
|
n1.receive(msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (e) {
|
||||||
|
done(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user