From e3b052bc38b6df90c08ece2e80906e3f1835f1d7 Mon Sep 17 00:00:00 2001 From: bartbutenaers Date: Sat, 4 Mar 2017 10:11:58 +0100 Subject: [PATCH] initial commit --- nodes/core/io/21-httprequest.js | 147 +++++++++++++++++++++++++++----- 1 file changed, 125 insertions(+), 22 deletions(-) diff --git a/nodes/core/io/21-httprequest.js b/nodes/core/io/21-httprequest.js index 9ac8ffcaa..f793cd13b 100644 --- a/nodes/core/io/21-httprequest.js +++ b/nodes/core/io/21-httprequest.js @@ -25,6 +25,7 @@ module.exports = function(RED) { function HTTPRequest(n) { RED.nodes.createNode(this,n); var node = this; + var req; var nodeUrl = n.url; var isTemplatedUrl = (nodeUrl||"").indexOf("{{") != -1; var nodeMethod = n.method || "GET"; @@ -41,7 +42,32 @@ module.exports = function(RED) { if (process.env.no_proxy != null) { noprox = process.env.no_proxy.split(","); } if (process.env.NO_PROXY != null) { noprox = process.env.NO_PROXY.split(","); } + function handleMsg(msg) { + if (node.metric()) { + // Calculate request time + var diff = process.hrtime(preRequestTimestamp); + var ms = diff[0] * 1e3 + diff[1] * 1e-6; + var metricRequestDurationMillis = ms.toFixed(3); + node.metric("duration.millis", msg, metricRequestDurationMillis); + if (res.client && res.client.bytesRead) { + node.metric("size.bytes", msg, res.client.bytesRead); + } + } + if (node.ret === "txt") { + msg.payload = msg.payload.toString(); + } + else if (node.ret === "obj") { + try { msg.payload = JSON.parse(msg.payload); } + catch(e) { node.warn(RED._("httpin.errors.json-error")); } + } + node.send(msg); + node.status({}); + } + this.on("input",function(msg) { + var boundary = ""; + var chunkBuffer = Buffer.from(''); + var headerBodySeparator = ""; var preRequestTimestamp = process.hrtime(); node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"}); var url = nodeUrl || msg.url; @@ -79,6 +105,7 @@ module.exports = function(RED) { var opts = urllib.parse(url); opts.method = method; opts.headers = {}; + opts.encoding = null; // response body should be buffer, not string var ctSet = "Content-Type"; // set default camel case var clSet = "Content-Length"; if (msg.headers) { @@ -160,36 +187,111 @@ module.exports = function(RED) { if (tlsNode) { tlsNode.addTLSOptions(opts); } - var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) { - (node.ret === "bin") ? res.setEncoding('binary') : res.setEncoding('utf8'); + if (req) { + req.abort(); + } + req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) { + //(node.ret === "bin") ? res.setEncoding('binary') : res.setEncoding('utf8'); msg.statusCode = res.statusCode; msg.headers = res.headers; msg.responseUrl = res.responseUrl; - msg.payload = ""; + msg.payload = Buffer.from(''); // msg.url = url; // revert when warning above finally removed + + res.setEncoding(null); + delete res._readableState.decoder; + res.on('data',function(chunk) { - msg.payload += chunk; - }); - res.on('end',function() { - if (node.metric()) { - // Calculate request time - var diff = process.hrtime(preRequestTimestamp); - var ms = diff[0] * 1e3 + diff[1] * 1e-6; - var metricRequestDurationMillis = ms.toFixed(3); - node.metric("duration.millis", msg, metricRequestDurationMillis); - if (res.client && res.client.bytesRead) { - node.metric("size.bytes", msg, res.client.bytesRead); + var nextPart = 0; + + if (!boundary) { + var contentType = this.headers['content-type']; + if (contentType) { + // Automatically check whether multipart streaming is required + if (/multipart/.test(contentType)) { + // Automatically detect the required boundary (that will be used between parts of the stream) + boundary = (contentType.match(/.*;\sboundary=(.*)/) || [null, null])[1]; + + if(!boundary) { + node.error(RED._("httpin.errors.no-boundary"),msg); + return; + } + + // A boundary needs to start with -- (even if not specified in the http header variable) + if (!boundary.startsWith('--')) { + boundary = '--' + boundary; + } + + // Every part contains of headers and a body (content) separated by two EOL (end of line) symbols. + // The end of line can be LF (linefeed \n), CR (carriage return \r), CRLF (carriage return linefeed \r\n). + // When the stream starts, the EOL should be determined. + var eolSymbols = (chunk.toString().match(/(?:\r\r|\n\n|\r\n\r\n)/g) || []); + + if (eolSymbols.indexOf('\r\n\r\n') >= 0) { + headerBodySeparator = '\r\n\r\n'; + } + else if (eolSymbols.indexOf('\r\r') >= 0) { + headerBodySeparator = '\r\r'; + } + else if (eolSymbols.indexOf('\n\n') >= 0) { + headerBodySeparator = '\n\n'; + } + + if(!headerBodySeparator) { + node.error(RED._("httpin.errors.no-separator"),msg); + return; + } + } } } - if (node.ret === "bin") { - msg.payload = new Buffer(msg.payload,"binary"); + + // Append the chunk to other (non-processed) chunk data + chunkBuffer = Buffer.concat([chunkBuffer, chunk]); + chunk = null; + + if (boundary) { + while(true) { + // Parts are separated by boundaries, so try to isolate parts in the received chunks. + var bodyEnd = chunkBuffer.indexOf(boundary, nextPart); + + if (bodyEnd == -1) { + // Store the remaining (incomplete) part in the chunk buffer, to be processed when the next chunk arrives + chunkBuffer = chunkBuffer.slice(nextPart, chunkBuffer.length); + break; + } + + nextPart = bodyEnd + boundary.length; + + // Find the part body (that arrives after the part header) + // The header 'Content length' is optional, so it cannot be used here + var bodyStart = chunkBuffer.indexOf(headerBodySeparator) + headerBodySeparator.length; + + // Trim optional CR or LF characters at the start of the body + for (var i = bodyStart; i <= bodyEnd; i++) { + if (chunkBuffer[i] !== '\n' && chunkBuffer[i] !== '\r') { + break; + } + bodyStart++; + } + + // Trim optional CR or LF characters at the end of the body + for (var i = bodyEnd - 1; i >= bodyStart; i--) { + if (chunkBuffer[i] !== '\n' && chunkBuffer[i] !== '\r') { + break; + } + bodyEnd--; + } + + if (bodyEnd - bodyStart > 0) { + // Send the body to the output port of this node + msg.payload = chunkBuffer.slice(bodyStart, bodyEnd); + handleMsg(msg); + } + } } - else if (node.ret === "obj") { - try { msg.payload = JSON.parse(msg.payload); } - catch(e) { node.warn(RED._("httpin.errors.json-error")); } - } - node.send(msg); - node.status({}); + }); + res.on('end',function() { + handleMsg(msg); }); }); req.setTimeout(node.reqTimeout, function() { @@ -224,3 +326,4 @@ module.exports = function(RED) { } }); } +