Multipart streaming

This commit is contained in:
bartbutenaers 2017-04-08 22:18:07 +02:00 committed by Nick O'Leary
parent d7f5b0c9d7
commit 9cc04da7b2
No known key found for this signature in database
GPG Key ID: 4F2157149161A6C9
3 changed files with 295 additions and 24 deletions

View File

@ -62,6 +62,18 @@
<option value="obj" data-i18n="httpin.json"></option>
</select>
</div>
<div class="form-row">
<input type="checkbox" id="node-input-throttling" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-input-throttling" style="width: 70%;"><span data-i18n="httpin.throttling"></span></label>
<div style="margin-left: 20px" class="node-input-throttling-row hide">
<div class="form-row">
<label for="node-input-interval"><i class="fa fa-clock-o"></i> <span data-i18n="httpin.interval"></span></label>
<input type="text" id="node-input-interval">
</div>
</div>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
@ -92,6 +104,7 @@
<li><code>headers</code> is an object containing the response headers</li>
<li><code>responseUrl</code> is the url of the server that responds</li>
</ul>
<p>The <b>interval</b> can be specified for (multipart) streams, to limit the number parts received: this is the number of milliseconds between parts in the stream.</p>
<p><b>Note</b>: If you need to configure a proxy please add <b>http_proxy=...</b> to your environment variables and restart Node-RED.</p>
</script>
@ -105,6 +118,7 @@
ret: {value:"txt"},
url:{value:"",validate:function(v) { return (v.trim().length === 0) || (v.indexOf("://") === -1) || (v.trim().indexOf("http") === 0)} },
tls: {type:"tls-config",required: false}
throttling: {value:0}
},
credentials: {
user: {type:"text"},
@ -138,6 +152,21 @@
$('#node-input-useAuth').prop('checked', false);
}
$("#node-input-useAuth").change();
$("#node-input-throttling").change(function() {
if ($(this).is(":checked")) {
$(".node-input-throttling-row").show();
} else {
$(".node-input-throttling-row").hide();
$('#node-input-interval').val('0');
}
});
if (this.interval) {
$('#node-input-throttling').prop('checked', true);
} else {
$('#node-input-throttling').prop('checked', false);
}
$("#node-input-throttling").change();
function updateTLSOptions() {
if ($("#node-input-usetls").is(':checked')) {

View File

@ -32,6 +32,9 @@ module.exports = function(RED) {
var tlsNode = RED.nodes.getNode(n.tls);
}
this.ret = n.ret || "txt";
this.prevReq = null;
this.throttling = n.throttling;
this.throttleInterval = n.interval;
if (RED.settings.httpRequestTimeout) { this.reqTimeout = parseInt(RED.settings.httpRequestTimeout) || 120000; }
else { this.reqTimeout = 120000; }
@ -40,11 +43,87 @@ module.exports = function(RED) {
if (process.env.HTTP_PROXY != null) { prox = process.env.HTTP_PROXY; }
if (process.env.no_proxy != null) { noprox = process.env.no_proxy.split(","); }
if (process.env.NO_PROXY != null) { noprox = process.env.NO_PROXY.split(","); }
function handleMsg(msg, boundary, preRequestTimestamp, currentStatus) {
if (node.metric()) {
// Calculate request time
var diff = process.hrtime(preRequestTimestamp);
var ms = diff[0] * 1e3 + diff[1] * 1e-6;
var metricRequestDurationMillis = ms.toFixed(3);
node.metric("duration.millis", msg, metricRequestDurationMillis);
if (res.client && res.client.bytesRead) {
node.metric("size.bytes", msg, res.client.bytesRead);
}
}
// Convert the payload to the required return type
msg.payload = Buffer.concat(msg.payload); // bin
if (node.ret !== "bin") {
msg.payload = msg.payload.toString('utf8'); // txt
if (node.ret === "obj") {
try { msg.payload = JSON.parse(msg.payload); } // obj
catch(e) { node.warn(RED._("httpin.errors.json-error")); }
}
}
// In case of multipart streaming, all End-of-line characters should be removed (both from the
// end and the start). Otherwise the data will be considered corrupt. These characters are
// remainings from the boundaries and part headers ...
if (boundary) {
var begin = 0;
var end = msg.payload.length - 1;
// Trim CR or LF characters at the end of the payload
for (var i = end; i >= begin; i--) {
if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') {
break;
}
end--;
}
// Trim optional CR or LF characters at the start of the current body
for (var i = begin; i <= end; i++) {
if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') {
break;
}
begin++;
}
msg.payload = msg.payload.slice(begin, end);
if (msg.payload.length == 0) {
return;
}
}
node.send(msg);
if (!boundary) {
node.status({});
}
else if ((Date.now() - currentStatus.timestamp) > 1000) {
// For multipart streaming, the node status is inverted every second (to let user know it is still busy processing)
if (currentStatus.value === "{}") {
currentStatus.value = {fill:"blue",shape:"dot",text:"httpin.status.streaming"};
}
else {
currentStatus.value = "{}";
}
node.status(currentStatus.value);
currentStatus.timestamp = Date.now();
}
}
this.on("input",function(msg) {
var boundary = "";
var headerBodySeparator = "";
var headerSeparator = "";
var searchString = "";
var currentStatus = {timestamp:0, value:'{}'};
var preRequestTimestamp = process.hrtime();
node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"});
var url = nodeUrl || msg.url;
var url = msg.url || nodeUrl;
if (msg.url && nodeUrl && (nodeUrl !== msg.url)) { // revert change below when warning is finally removed
node.warn(RED._("common.errors.nooverride"));
}
@ -160,7 +239,16 @@ module.exports = function(RED) {
if (tlsNode) {
tlsNode.addTLSOptions(opts);
}
var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) {
//var abortStream = msg.hasOwnProperty("abort_stream") && msg.abort_stream == true;
if (node.prevReq /*|| abortStream*/) {
// If a previous request is still busy (endless) streaming, then stop it.
node.prevReq.abort();
}
var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) {
var partCurrent = [];
var partHeader = [];
var partBody = [];
// Force NodeJs to return a Buffer (instead of a string)
// See https://github.com/nodejs/node/issues/6038
res.setEncoding(null);
@ -173,33 +261,179 @@ module.exports = function(RED) {
// msg.url = url; // revert when warning above finally removed
res.on('data',function(chunk) {
msg.payload.push(chunk);
});
res.on('end',function() {
if (node.metric()) {
// Calculate request time
var diff = process.hrtime(preRequestTimestamp);
var ms = diff[0] * 1e3 + diff[1] * 1e-6;
var metricRequestDurationMillis = ms.toFixed(3);
node.metric("duration.millis", msg, metricRequestDurationMillis);
if (res.client && res.client.bytesRead) {
node.metric("size.bytes", msg, res.client.bytesRead);
var searchIndex = -1;
if (!boundary) {
// -----------------------------------------------------------------------------------------
// Automatically check whether multipart streaming is required (at the start of the stream)
// -----------------------------------------------------------------------------------------
var contentType = this.headers['content-type'];
if (!/multipart/.test(contentType)) {
node.error(RED._("httpin.errors.no-multipart"),msg);
return;
}
// Automatically detect the required boundary (that will be used between parts of the stream)
boundary = (contentType.match(/.*;\sboundary=(.*)/) || [null, null])[1];
if(!boundary) {
node.error(RED._("httpin.errors.no-boundary"),msg);
return;
}
// A boundary needs to start with -- (even if -- is absent in the http header)
if (!boundary.startsWith('--')) {
boundary = '--' + boundary;
}
// Every part contains one or more headers and one body (content).
// Headers and body are separated by two EOL (end of line) symbols.
// Those EOL symbols can be LF (linefeed \n) or CR (carriage return \r) or CRLF (carriage return linefeed \r\n).
// Determine the EOL symbols at the start of the stream.
var eolSymbols = (chunk.toString().match(/(?:\r\r|\n\n|\r\n\r\n)/g) || []);
if (eolSymbols.indexOf('\r\n\r\n') >= 0) {
headerBodySeparator = '\r\n\r\n';
}
else if (eolSymbols.indexOf('\r\r') >= 0) {
headerBodySeparator = '\r\r';
}
else if (eolSymbols.indexOf('\n\n') >= 0) {
headerBodySeparator = '\n\n';
}
if(!headerBodySeparator) {
node.error(RED._("httpin.errors.no-separator"),msg);
return;
}
// The header separator is only one half of the header body separator;
headerSeparator = headerBodySeparator.slice(0, headerBodySeparator.length/2);
// Store the current request only in case streaming is detected, so it could be aborted afterwards
node.prevReq = req;
// The boundary should arrive at the start of the stream, so let's start searching for it
searchString = boundary;
}
// Convert the payload to the required return type
msg.payload = Buffer.concat(msg.payload); // bin
if (node.ret !== "bin") {
msg.payload = msg.payload.toString('utf8'); // txt
if (!boundary) {
// When no multipart streaming (i.e. request with single response part)
msg.payload.push(chunk);
}
else {
// -----------------------------------------------------------------------------------------
// Stream the data in the new chunk
// -----------------------------------------------------------------------------------------
var checkOverlap = partBody.length > 0;
if (node.ret === "obj") {
try { msg.payload = JSON.parse(msg.payload); } // obj
catch(e) { node.warn(RED._("httpin.errors.json-error")); }
while (true) {
if (searchString == boundary) {
partCurrent = partBody;
}
else {
partCurrent = partHeader;
}
// When starting with a new chunk and a previous chunk is available, check whether the search string is
// splitted across two chunks. Indeed data is splitted into chunks by the transport layer, which has
// no knowledge of the protocol being used (so vital data might be splitted).
if (checkOverlap == true) {
checkOverlap = false;
// For a searchString of N characters, create a new buffer containing the last N-1 characters
// of the previous chunk and N-1 characters of the current chunk.
var previousChunk = partCurrent[partCurrent.length - 1];
var previousTrail = previousChunk.slice(previousChunk.length - searchString.length + 1);
var currentLead = chunk.slice(0, searchString.length-1);
var chunkOverlap = Buffer.concat([previousTrail, currentLead]);
searchIndex = chunkOverlap.indexOf(searchString);
if (searchIndex >= 0) {
// Cut off the previous body chunk at the position where the search string starts
partCurrent[partCurrent.length - 1] = previousChunk.slice(0, previousChunk.length - searchString.length + searchIndex + 1);
// Adjust the start of the current chunk
chunk = chunk.slice(searchIndex + 1);
}
}
else {
// Try to find the search string in the current chunk
searchIndex = chunk.indexOf(searchString);
if (searchIndex >= 0) {
// Store the part of the chunk data preceding the position where the search string starts
partCurrent.push(chunk.slice(0, searchIndex));
// Adjust the start of the current chunk
chunk = chunk.slice(searchIndex + searchString.length);
}
else {
// Search string not found in this chunk, so store the chunk and proceed to the next chunk
partCurrent.push(chunk);
break;
}
}
if (searchIndex >= 0) {
if (searchString == boundary) {
// Clone the msg (without payload for speed)
var newMsg = RED.util.cloneMessage(msg);
// The part headers will be put in msg.content, with a JSON format like {Content-Type:..., Content-Length:... etc}.
// Note that part headers are optional, even the header "Content-Type" (which defaults to text/plain).
var comma = '';
var quote = '';
var content = '{';
Buffer.concat(partHeader).toString('utf8').trim().split(headerSeparator).forEach(function(entry) {
var entryArray = entry.split(":");
if (entryArray.length == 2) {
isNaN(entryArray[1]) ? quote = '"' : quote = '';
content += comma + '"' + entryArray[0].trim() + '": ' + quote + entryArray[1].trim() + quote;
comma = ', ';
}
});
content += '}';
newMsg.content = JSON.parse(content);
// If a part body has been found, let's put a message on the output port
newMsg.payload = partBody;
handleMsg(newMsg, boundary, preRequestTimestamp,currentStatus);
// Everything has been send, so start collecting data all over again ...
partHeader = [];
partBody = [];
// If a (non-zero) throttling interval is specified, the upload should be pauzed during that interval.
// If the message contains a throttling interval, it will override the node's throttling interval.
var throttleInterval = msg.throttle || node.throttleInterval;
if (throttleInterval && throttleInterval !== 0) {
res.pause();
setTimeout(function () {
res.resume();
}, throttleInterval);
}
// Boundary found, so from here on we will try to find a headerbodyseparator
searchString = headerBodySeparator;
}
else {
// HeaderBodySeparator found, so from here on we will try to find a boundary
searchString = boundary;
}
}
}
}
node.send(msg);
node.status({});
});
res.on('end',function() {
if(boundary) {
// If streaming is interrupted, the last part might not be complete (so skip it)
node.status({});
}
else {
handleMsg(msg, boundary, preRequestTimestamp, currentStatus);
}
});
});
req.setTimeout(node.reqTimeout, function() {
@ -223,6 +457,11 @@ module.exports = function(RED) {
});
this.on("close",function() {
if (node.prevReq) {
// At (re)deploy make sure the streaming is closed, otherwise e.g. it keeps sending data across already removed wires
node.prevReq.abort();
}
node.status({});
});
}

View File

@ -341,6 +341,8 @@
},
"setby": "- set by msg.method -",
"basicauth": "Use basic authentication",
"throttling": "Limit (multipart) stream rate",
"interval": "Interval",
"use-tls": "Enable secure (SSL/TLS) connection",
"tls-config":"TLS Configuration",
"utf8": "a UTF-8 string",
@ -362,7 +364,8 @@
"invalid-transport":"non-http transport requested"
},
"status": {
"requesting": "requesting"
"requesting": "requesting",
"streaming": "streaming"
}
},
"websocket": {