0 Design: msg.parts description of usage
Hiroyasu Nishiyama edited this page 2017-09-20 22:50:50 +09:00
This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

The split/join node currently use msg.parts as a way of passing meta data about the various part pieces of a message so they may be joined again successfully later.

It has been pointed out that there are many clever things that could be done in between splitting and re-joining the messages, such as filtering unwanted parts, inserting extra parts, and letting other nodes either produce pre-split data (such as file in, tcp in, and potentially several more). As such it makes sense to document the existing parts object, and it's current expected usage, so that enhancements can be discussed and incorporated as required.

Existing msg.parts object

The following sub-properties exists

name type mandatory description
type string yes Either string, buffer, array, or object
id string yes unique id for this set of parts (for example based on original _msgid), to allow for interleaved messages to be rejoined.
index number yes an index of the number of parts starting from 0
count number no total number of parts in the set. Typically known for simple splits, unknown for most streams, though a count may be added to files once reading is complete.
ch string/array no the data used to split/join the message. Either a String, for example "\n", or an Array of bytes: [1,2,3], ["0x0a","0x0d"]. (Only present if the message was split using this value; not present for fixed-length splits).
key string no the key used as the property name within the joined object. (Only present if object was split)
len number no split length of array, string or buffer. (Only present if array, string or buffer was split into fixed lengths)
parts object no push down stack of previous parts objects to allow nested splits to be handled.

Current Join behaviour

The msg is currently considered complete if

  • msg.parts.index >= msg.parts.count
  • however if msg.parts.count == 0 or is undefined then no output is sent.
  • msg.complete exists (is set to any value)
  • after an optional timeout from the first part to arrive

Having popped any pushed msg.parts.parts back up to msg.parts the completed msg is then output.


Discussion

What the current behaviour allows is that the join can be manipulated by changing msg.parts.count (or msg.complete) on any of the intermediate parts. For example the join can be ended early by saying it is now complete - or extra items inserted by just adding to the the count so the existing join doesn't complete.

However - while this makes sense for strings and objects - may need some thinking about for consequences for arrays as currently we try to preserve order in the array. So what does removing or inserting mean in this context - or rather what should the behaviour actually be.

Proposal: optional list-like representation

The msg.parts is adequate tool for doing list-like operations with a Node-RED flow. For example, filtering messages in a message stream or inserting messages to a message stream is useful for handling messages with some relation. However, changing message counts of a message stream needs sotring whole messages in the stream then changing msg.parts.count and msg.parts.index. For short streams, this should never be a problem. However, for long streams keeping and changing all messages in a message stream may raise problems such as increased memory usage or delay in messages.

So as a optional representation of msg.parts, we propose following linked list representation:

name type mandatory description
next string yes msg.id of next message

Either parts.count and parts.index, or parts.next can be used at the same time. Insertion or deletion of messages with list-like representation can be implemented by keeping smaller number of messages.

Selection of old array-like representation or list-like representation should be selected by producer nodes. Conversion nodes between two representations considered to be useful.

DCJ comment - looking at the list of proposed nodes - none currently would need this idea in order to be implemented successfully. Typically nodes will have to work in one of two ways.

1. message sequence - one at a time. As each message arrives it is processed - the parts can then be manipulated - eg insert extra by incrementing index and count, etc. or
2. message group - chunk at a time. The whole group needs to have arrived before being sorted / sliced etc - at which point the parts can be re-generated before sending on the burst.

Proposed New Nodes

Chunk node

A function that divides input messages into multiple sequences of messages. It can be used to group together a series of inputs into a "chunk" for further processing. An overlap can be specified so (for example) the last reading in one chunk will also be the first reading in the next chunk.

Messages grouping mode can be selected from:

  • number of messages - group received messages into chunks of the same number of messages,
  • interval in seconds - group received messages within specified interval.

Behaviour/Implementation thoughts

What happens when no messages arrive in "interval" mode ? Options:

1. send a blank msg, with the `parts` specified correctly as count of zero.
2. don't send anything.
  • (HN) (1) seems better because it is convenient if there is a triggering message for processing messages over a certain period. As you pointed out offline, the functionality of chunk node is similar to split node. Integrating chunk functionality to split node seems to be preferred.

Filter node

A function that filters a sequence of messages. The type of the filtering condition can be:

  • payload value - compare msg.payload with a constant value by operator(=,!=,<,<=,>,>=).
  • regular expression - match with regular expression.
  • filtering expression - JavaScript expression. On evaluation, variable msg is bound to the message, and $ is bound to msg.payload.
  • head - only first N messages of a group are sent - just count messages as they arrive.
  • tail - last N messages - would have to wait for "all" messages in a group to arrive before sending a burst of messages onwards...

If the condition evaluates to true for an input message, the message is send to output wire. Otherwise, the message is discarded.

Behaviour/Implementation thoughts

  • Should it have two outputs ? one for things that match and one for things that don't ? Would save having two nodes if both types of message are required. (DCJ - I think yes)
    • (HN) Having more than outputs is good idea. It may be better to be able to specify multiple filter outputs and corresponding outputs similar to switch node.
  • Is javascript the right expression language to use (given we already have jsonata etc - is another one a good idea ?)
    • (HN) Considering use of jsonata in core nodes, I think using JSONata as expression language is better.
  • Tail mode - is end only triggered by comparison to count ? (DCJ - I think yes.)
    • (HN) In current prototype implementation, yes.
  • Head mode - if n messages don't arrive for that group - drop any currently accumulated ? send part msg ? eg if we ask for 10 - and a group of 5 arrive... does it get sent ? or dropped ? (DCJ - I think send)
    • (HN) Current implementation waits forever the arrival of messages. But, sending message is better as you pointed out. Whether to set the number of messages to be send to 5 or 10 (5 with empty payload?) seems to have room for discussion. 10 seems to be better for me because we can expect fixed size of sequence for output of filter node.
  • Apart from head and tail modes - this is similar to the switch node - should that be enhanced to handle parts instead ? (DCJ - I think no - but ?)
    • (HN) If we allow multiple outputs as pointed by 1st bullet, this node becomes very similar to the switch node. So, I think enhancing the switch to handle parts property is better.
  • In non head and tail mode this node could work on messages without parts also. Should that be supported ? (DCJ - I think yes)
    • (HN) I also think yes.
  • If any filter removes the "last" message in an incoming group - then no final msg with a "correct" parts will be left to be sent (ie count won't be set) - so we will need to send a blank msg with parts.count set correctly in this case. (as well as maybe sending to the second output if that port exists)
    • (HN) Similar situation also happens on addition/removal of any message in the incoming group. But, the filter node can not distinguish between message delay and deletion without correct parts. (misunderstanding the comment?)

    • (DCJ) No - this is only the same as "missing the last part" - as long as on the last message the index is same as count-1 (or greater) then the message (is currently) considered complete and gets sent. So it's only an issue on at the end. But is a real issue if it doesn't arrive.

      • (HN) I understood. The issue on the arrival of last message is disscussed in the following item on sort node.

Sort node

A function that sorts a sequence of messages or payload of array type. When paired with the split node, it will reorder the messages. The sorting order can be:

  • ascending,
  • descending.

For numbers, numerical ordering can be specified by a checkbox.

Sort key can be payload or any JavaScript expression for sorting messages, element value or any JavaScript expression for sorting payload of array type.

Behaviour/Implementation

  • If "last" message doesn't arrive - just drop the group (when next group start to arrive) ?
    • (HN) Depending on the flow, 1st message of next group may arraive before last message of previous group. So, we have to wait the "last" message.
  • In order to sort correctly the whole sequence of messages needs to have arrived - so could (in theory) be a very large memory overhead, plus then once sorted all messages will be resent in correct order as a large burst. Is this acceptable ? The new parts properties can be regenerated just prior to sending.
    • (HN) Yes. Memory overhead and burst message sending may be problems of sort node (and also other node that need to fix parts property). I have no good idea on this except to mention in info tab of nodes.
  • should the expression be javascript ? or jsonata ? or ...
    • (HN) Similar to filter node, I think using JSONata is better.

(DCJ) - what happens if the last message never arrives ???? can't leave in memory forever...

  • (HN) I think there are two options:

    1. Leave it to the user (i.e. consume memory forever), or

    2. Set an upper limit on the length of a message sequence to wait. If the limit is exceeded, an exception is thrown.

      The upper limit is:

      a. set globally by settings.js, or

      b. set by node setting panel, flow context, global context, or settings.js (referenced in this order).

    I prefer option 2-b.

Merge node

A function that merges set of input messages into a single message.

Input messages are merged in order specified by a field (called Topics).

For example, if value of Topics is x,x,y, two input messages with topic x and one input message with topic y are merged into a new message.

The merged message contains payload property and properties for each topic. The payload property represents ordered array of payload value of input messages for each topic. The property for each topic represents a payload value for single occurrence of topic or array of payload values for multiple occurrence of topic.

This is merging messages without using parts - and creates a msg without parts.

Implementation

  • What happens if msg arrive out of order ?
    • (HN) Current prototype merges messages in order of their arrival. Considering parts property and creating output message from corresponding messages seems to be better.
  • or too many of one topic arrive before the correct number of another ? drop first ? drop last ?
    • (HN) Waits for arrival of required messages. Other messages are kept internally until ready.
  • Should this be a mode of operation of the current join node ?
    • (HN) I agree with this idea.

Map/Reduce node

A function that map or reduce message payload values. Map and reduce operation can be selected by checkbox.

If map option is selected, an expression specified by Map expression is applied to msg.payload.

If reduce option is selected, an expression specified in Reduce expression field is applied to message sequence with initial value specified by Initial value field. Associative order of application of reduce expression is selected by a right reduce check box.

After applying reduce expression to the message sequence, expression specified in Fixup exp is applied to the result.

Behaviour/Implementation

  • should the expressions be javascript or jsonata ?
    • (HN) Similar to filter node and sort node, I think using JSONata is better. Like Map/Reduce node, we need to add special variable to access msg, msg.payload, number of msgs in a sequence, index of msg in a sequence, and accumulated value (represented by _, , $N, $I, and $, respectively in prototype implementation).
  • is the Map only mode - similar to change node ?
    • (HN) Yes. Its functionality is the same. So, I think integrating reduce mode to change node is better idea.

Summary of Updated Proposal(HN)

Switch node

  • enhance to consider parts property and to produce message sequence as filter node.

<removed>

  • allow head and tail as routing rule for message sequences.
  • make head applicable to Streaming mode of split node (i.e. without parts.count).
  • make whether or not to consider parts property selectable by option of switch node. (add third rule checking all rules with message sequence)

</removed>

  • (HN) These items are unnecessary because time sequence based modes are implemented in filter node.

(DCJ) I'm starting to think the time sequence based modes (head tail) are actually different/confusing enough that they should still be a separate node - but that the generic expression filtering should be within the switch node. That way further sequence type modes could be considered more easily... for example - every odd numbered message, the 3rd to the 7th messages (sort of like a time based substring...). Both should be aware of msg.parts.

  • (HN) I agree with your proposal. So we enhance switch node to support parts property and add new filter mode with time sequence based filtering.

Filter node [New]

  • Allow filtering based on time-based sequence,
  • Use rule specification interface similar to switch node,
  • Support rules:
    • head N - filter the first N msgs in a sequence,
    • tail N - filter the last N msgs in a sequence,
    • range N, M - filter from N-th to M-th msgs in a sequence,
    • JSONata expression on msg with shorthand for parts.index ($I) and parts.count ($N),
    • otherwise - no matching rule.

Split node

  • enhance to allow new mode for creating sequences from input messages as chunk node.
  • as an extension to current chunk node implementation, add functionality to create sequences from String, Buffer, Array or Object as they are supported by split node.

Join node

  • enhance to allow new mode for merging messages by topics as merge node.
  • add new functionality to concatenate input sequences using topic value.
    example: Topic1:[1, 2] + Topic2:[A, B, C] => Topic1+Topic2:[1, 2, A, B, C]
    
  • messages are processed in order if they have parts property.

(DCJ) - Is this just to create an array ? If so a) what value should the resulting topic be ? (I don't think Topic1+Topic2 is the answer). and b) should the "template" for which topics to include in the new array be an extra line in the "Send message after..." section of the existing options dialog ?

  • (HN) In the above example, I intended to represent message sequences by […] representation. Sorry for confusing representation. The purpose of message sequence concatenation is to generate a message sequence from more than one sequences. I think sequence concatenation is useful for creating multiple message sequence from one sequence then reconstructing a new sequence. (e.g. filtering CSV records by multiple criteria, then joining to one CSV file)

Change node

  • enhance to support reduce functionality of Map/Reduce node.
  • use JSONata as expression language for reduce and fixup.
  • reduce and fixup expression is only applied to message sequence (msg having parts property). Other messages are processed the same as current change node.
  • for JSONata expression of reduce mode add special variable $I(index of message), $N(number of messages in a sequence), $A(accumulated value).

(DCJ) - are these extra variables needed as you can already get to all parts of the msg ? or are they in node context variables ? or are they just for convenience ?

  • (HN) $I is shorthand for parts.index so this variable is not necessarily required. $N is needed because parts.count may not exist for a message except the last one in a sequence. $A is also needed because the accumulated value of a reduce operation is not included in a message.

Sort node

  • use JSONata for key expression.

csv node and html node

  • enhance to support parts property.

(DCJ - yes - see also Issue #1394 - which is about support on the input side of CSV. (but will need to be merged)

  • (HN) Thank you. I think this enhancement is useful.