From bd671e75e6174350e386d9b2c259e7ba1769bbd2 Mon Sep 17 00:00:00 2001 From: bartbutenaers Date: Sat, 8 Apr 2017 22:18:07 +0200 Subject: [PATCH] Multipart streaming --- nodes/core/io/21-httprequest.html | 31 ++- nodes/core/io/21-httprequest.js | 285 +++++++++++++++++++++++-- nodes/core/locales/en-US/messages.json | 10 +- 3 files changed, 300 insertions(+), 26 deletions(-) diff --git a/nodes/core/io/21-httprequest.html b/nodes/core/io/21-httprequest.html index 0c23d0f11..c50f8751d 100644 --- a/nodes/core/io/21-httprequest.html +++ b/nodes/core/io/21-httprequest.html @@ -62,6 +62,18 @@ + +
+ + +
+
+ + +
+
+
+
@@ -92,6 +104,7 @@
  • headers is an object containing the response headers
  • responseUrl is the url of the server that responds
  • +

    The interval can be specified for (multipart) streams, to limit the number parts received: this is the number of milliseconds between parts in the stream.

    Note: If you need to configure a proxy please add http_proxy=... to your environment variables and restart Node-RED.

    @@ -104,7 +117,8 @@ method:{value:"GET"}, ret: {value:"txt"}, url:{value:""}, - tls: {type:"tls-config",required: false} + tls: {type:"tls-config",required: false}, + throttling: {value:0} }, credentials: { user: {type:"text"}, @@ -135,6 +149,21 @@ $('#node-input-useAuth').prop('checked', false); } $("#node-input-useAuth").change(); + + $("#node-input-throttling").change(function() { + if ($(this).is(":checked")) { + $(".node-input-throttling-row").show(); + } else { + $(".node-input-throttling-row").hide(); + $('#node-input-interval').val('0'); + } + }); + if (this.interval) { + $('#node-input-throttling').prop('checked', true); + } else { + $('#node-input-throttling').prop('checked', false); + } + $("#node-input-throttling").change(); function updateTLSOptions() { if ($("#node-input-usetls").is(':checked')) { diff --git a/nodes/core/io/21-httprequest.js b/nodes/core/io/21-httprequest.js index 1868089ae..1cfa86515 100644 --- a/nodes/core/io/21-httprequest.js +++ b/nodes/core/io/21-httprequest.js @@ -32,6 +32,9 @@ module.exports = function(RED) { var tlsNode = RED.nodes.getNode(n.tls); } this.ret = n.ret || "txt"; + this.prevReq = null; + this.throttling = n.throttling; + this.throttleInterval = n.interval; if (RED.settings.httpRequestTimeout) { this.reqTimeout = parseInt(RED.settings.httpRequestTimeout) || 120000; } else { this.reqTimeout = 120000; } @@ -40,11 +43,87 @@ module.exports = function(RED) { if (process.env.HTTP_PROXY != null) { prox = process.env.HTTP_PROXY; } if (process.env.no_proxy != null) { noprox = process.env.no_proxy.split(","); } if (process.env.NO_PROXY != null) { noprox = process.env.NO_PROXY.split(","); } + + function handleMsg(msg, boundary, preRequestTimestamp, currentStatus) { + if (node.metric()) { + // Calculate request time + var diff = process.hrtime(preRequestTimestamp); + var ms = diff[0] * 1e3 + diff[1] * 1e-6; + var metricRequestDurationMillis = ms.toFixed(3); + node.metric("duration.millis", msg, metricRequestDurationMillis); + if (res.client && res.client.bytesRead) { + node.metric("size.bytes", msg, res.client.bytesRead); + } + } + + // Convert the payload to the required return type + msg.payload = Buffer.concat(msg.payload); // bin + if (node.ret !== "bin") { + msg.payload = msg.payload.toString('utf8'); // txt + + if (node.ret === "obj") { + try { msg.payload = JSON.parse(msg.payload); } // obj + catch(e) { node.warn(RED._("httpin.errors.json-error")); } + } + } + + // In case of multipart streaming, all End-of-line characters should be removed (both from the + // end and the start). Otherwise the data will be considered corrupt. These characters are + // remainings from the boundaries and part headers ... + if (boundary) { + var begin = 0; + var end = msg.payload.length - 1; + + // Trim CR or LF characters at the end of the payload + for (var i = end; i >= begin; i--) { + if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') { + break; + } + end--; + } + + // Trim optional CR or LF characters at the start of the current body + for (var i = begin; i <= end; i++) { + if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') { + break; + } + begin++; + } + + msg.payload = msg.payload.slice(begin, end); + + if (msg.payload.length == 0) { + return; + } + } + + node.send(msg); + + if (!boundary) { + node.status({}); + } + else if ((Date.now() - currentStatus.timestamp) > 1000) { + // For multipart streaming, the node status is inverted every second (to let user know it is still busy processing) + if (currentStatus.value === "{}") { + currentStatus.value = {fill:"blue",shape:"dot",text:"httpin.status.streaming"}; + } + else { + currentStatus.value = "{}"; + } + node.status(currentStatus.value); + currentStatus.timestamp = Date.now(); + } + } this.on("input",function(msg) { + var boundary = ""; + var headerBodySeparator = ""; + var headerSeparator = ""; + var searchString = ""; + var currentStatus = {timestamp:0, value:'{}'}; var preRequestTimestamp = process.hrtime(); node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"}); - var url = nodeUrl || msg.url; + var url = msg.url || nodeUrl; if (msg.url && nodeUrl && (nodeUrl !== msg.url)) { // revert change below when warning is finally removed node.warn(RED._("common.errors.nooverride")); } @@ -155,7 +234,16 @@ module.exports = function(RED) { if (tlsNode) { tlsNode.addTLSOptions(opts); } - var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) { + //var abortStream = msg.hasOwnProperty("abort_stream") && msg.abort_stream == true; + if (node.prevReq /*|| abortStream*/) { + // If a previous request is still busy (endless) streaming, then stop it. + node.prevReq.abort(); + } + var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) { + var partCurrent = []; + var partHeader = []; + var partBody = []; + // Force NodeJs to return a Buffer (instead of a string) // See https://github.com/nodejs/node/issues/6038 res.setEncoding(null); @@ -168,33 +256,179 @@ module.exports = function(RED) { // msg.url = url; // revert when warning above finally removed res.on('data',function(chunk) { - msg.payload.push(chunk); - }); - res.on('end',function() { - if (node.metric()) { - // Calculate request time - var diff = process.hrtime(preRequestTimestamp); - var ms = diff[0] * 1e3 + diff[1] * 1e-6; - var metricRequestDurationMillis = ms.toFixed(3); - node.metric("duration.millis", msg, metricRequestDurationMillis); - if (res.client && res.client.bytesRead) { - node.metric("size.bytes", msg, res.client.bytesRead); + var searchIndex = -1; + + if (!boundary) { + // ----------------------------------------------------------------------------------------- + // Automatically check whether multipart streaming is required (at the start of the stream) + // ----------------------------------------------------------------------------------------- + var contentType = this.headers['content-type']; + + if (!/multipart/.test(contentType)) { + node.error(RED._("httpin.errors.no-multipart"),msg); + return; } + + // Automatically detect the required boundary (that will be used between parts of the stream) + boundary = (contentType.match(/.*;\sboundary=(.*)/) || [null, null])[1]; + + if(!boundary) { + node.error(RED._("httpin.errors.no-boundary"),msg); + return; + } + + // A boundary needs to start with -- (even if -- is absent in the http header) + if (!boundary.startsWith('--')) { + boundary = '--' + boundary; + } + + // Every part contains one or more headers and one body (content). + // Headers and body are separated by two EOL (end of line) symbols. + // Those EOL symbols can be LF (linefeed \n) or CR (carriage return \r) or CRLF (carriage return linefeed \r\n). + // Determine the EOL symbols at the start of the stream. + var eolSymbols = (chunk.toString().match(/(?:\r\r|\n\n|\r\n\r\n)/g) || []); + + if (eolSymbols.indexOf('\r\n\r\n') >= 0) { + headerBodySeparator = '\r\n\r\n'; + } + else if (eolSymbols.indexOf('\r\r') >= 0) { + headerBodySeparator = '\r\r'; + } + else if (eolSymbols.indexOf('\n\n') >= 0) { + headerBodySeparator = '\n\n'; + } + + if(!headerBodySeparator) { + node.error(RED._("httpin.errors.no-separator"),msg); + return; + } + + // The header separator is only one half of the header body separator; + headerSeparator = headerBodySeparator.slice(0, headerBodySeparator.length/2); + + // Store the current request only in case streaming is detected, so it could be aborted afterwards + node.prevReq = req; + + // The boundary should arrive at the start of the stream, so let's start searching for it + searchString = boundary; } - // Convert the payload to the required return type - msg.payload = Buffer.concat(msg.payload); // bin - if (node.ret !== "bin") { - msg.payload = msg.payload.toString('utf8'); // txt + if (!boundary) { + // When no multipart streaming (i.e. request with single response part) + msg.payload.push(chunk); + } + else { + // ----------------------------------------------------------------------------------------- + // Stream the data in the new chunk + // ----------------------------------------------------------------------------------------- + var checkOverlap = partBody.length > 0; - if (node.ret === "obj") { - try { msg.payload = JSON.parse(msg.payload); } // obj - catch(e) { node.warn(RED._("httpin.errors.json-error")); } + while (true) { + if (searchString == boundary) { + partCurrent = partBody; + } + else { + partCurrent = partHeader; + } + + // When starting with a new chunk and a previous chunk is available, check whether the search string is + // splitted across two chunks. Indeed data is splitted into chunks by the transport layer, which has + // no knowledge of the protocol being used (so vital data might be splitted). + if (checkOverlap == true) { + checkOverlap = false; + + // For a searchString of N characters, create a new buffer containing the last N-1 characters + // of the previous chunk and N-1 characters of the current chunk. + var previousChunk = partCurrent[partCurrent.length - 1]; + var previousTrail = previousChunk.slice(previousChunk.length - searchString.length + 1); + var currentLead = chunk.slice(0, searchString.length-1); + var chunkOverlap = Buffer.concat([previousTrail, currentLead]); + + searchIndex = chunkOverlap.indexOf(searchString); + if (searchIndex >= 0) { + // Cut off the previous body chunk at the position where the search string starts + partCurrent[partCurrent.length - 1] = previousChunk.slice(0, previousChunk.length - searchString.length + searchIndex + 1); + + // Adjust the start of the current chunk + chunk = chunk.slice(searchIndex + 1); + } + } + else { + // Try to find the search string in the current chunk + searchIndex = chunk.indexOf(searchString); + + if (searchIndex >= 0) { + // Store the part of the chunk data preceding the position where the search string starts + partCurrent.push(chunk.slice(0, searchIndex)); + + // Adjust the start of the current chunk + chunk = chunk.slice(searchIndex + searchString.length); + } + else { + // Search string not found in this chunk, so store the chunk and proceed to the next chunk + partCurrent.push(chunk); + break; + } + } + + if (searchIndex >= 0) { + if (searchString == boundary) { + // Clone the msg (without payload for speed) + var newMsg = RED.util.cloneMessage(msg); + + // The part headers will be put in msg.content, with a JSON format like {Content-Type:..., Content-Length:... etc}. + // Note that part headers are optional, even the header "Content-Type" (which defaults to text/plain). + var comma = ''; + var quote = ''; + var content = '{'; + Buffer.concat(partHeader).toString('utf8').trim().split(headerSeparator).forEach(function(entry) { + var entryArray = entry.split(":"); + if (entryArray.length == 2) { + isNaN(entryArray[1]) ? quote = '"' : quote = ''; + content += comma + '"' + entryArray[0].trim() + '": ' + quote + entryArray[1].trim() + quote; + comma = ', '; + } + }); + content += '}'; + newMsg.content = JSON.parse(content); + + // If a part body has been found, let's put a message on the output port + newMsg.payload = partBody; + handleMsg(newMsg, boundary, preRequestTimestamp,currentStatus); + + // Everything has been send, so start collecting data all over again ... + partHeader = []; + partBody = []; + + // If a (non-zero) throttling interval is specified, the upload should be pauzed during that interval. + // If the message contains a throttling interval, it will override the node's throttling interval. + var throttleInterval = msg.throttle || node.throttleInterval; + if (throttleInterval && throttleInterval !== 0) { + res.pause(); + setTimeout(function () { + res.resume(); + }, throttleInterval); + } + + // Boundary found, so from here on we will try to find a headerbodyseparator + searchString = headerBodySeparator; + } + else { + // HeaderBodySeparator found, so from here on we will try to find a boundary + searchString = boundary; + } + } } } - - node.send(msg); - node.status({}); + }); + res.on('end',function() { + if(boundary) { + // If streaming is interrupted, the last part might not be complete (so skip it) + node.status({}); + } + else { + handleMsg(msg, boundary, preRequestTimestamp, currentStatus); + } }); }); req.setTimeout(node.reqTimeout, function() { @@ -218,6 +452,11 @@ module.exports = function(RED) { }); this.on("close",function() { + if (node.prevReq) { + // At (re)deploy make sure the streaming is closed, otherwise e.g. it keeps sending data across already removed wires + node.prevReq.abort(); + } + node.status({}); }); } diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json index 2a03598dd..e2e7c0a82 100644 --- a/nodes/core/locales/en-US/messages.json +++ b/nodes/core/locales/en-US/messages.json @@ -317,6 +317,8 @@ }, "setby": "- set by msg.method -", "basicauth": "Use basic authentication", + "throttling": "Limit (multipart) stream rate", + "interval": "Interval", "use-tls": "Enable secure (SSL/TLS) connection", "tls-config":"TLS Configuration", "utf8": "a UTF-8 string", @@ -334,10 +336,14 @@ "no-response": "No response object", "json-error": "JSON parse error", "no-url": "No url specified", - "deprecated-call":"Deprecated call to __method__" + "deprecated-call":"Deprecated call to __method__", + "no-multipart": "No multipart content-type header found", + "no-boundary": "No multipart boundary found", + "no-separator": "No multipart EOL separator could be determined" }, "status": { - "requesting": "requesting" + "requesting": "requesting", + "streaming": "streaming" } }, "websocket": {