mirror of
https://github.com/node-red/node-red.git
synced 2023-10-10 13:36:53 +02:00
parent
9cc04da7b2
commit
f54f863611
@ -62,18 +62,6 @@
|
|||||||
<option value="obj" data-i18n="httpin.json"></option>
|
<option value="obj" data-i18n="httpin.json"></option>
|
||||||
</select>
|
</select>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="form-row">
|
|
||||||
<input type="checkbox" id="node-input-throttling" style="display: inline-block; width: auto; vertical-align: top;">
|
|
||||||
<label for="node-input-throttling" style="width: 70%;"><span data-i18n="httpin.throttling"></span></label>
|
|
||||||
<div style="margin-left: 20px" class="node-input-throttling-row hide">
|
|
||||||
<div class="form-row">
|
|
||||||
<label for="node-input-interval"><i class="fa fa-clock-o"></i> <span data-i18n="httpin.interval"></span></label>
|
|
||||||
<input type="text" id="node-input-interval">
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<div class="form-row">
|
<div class="form-row">
|
||||||
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
|
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
|
||||||
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
|
<input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
|
||||||
@ -104,7 +92,6 @@
|
|||||||
<li><code>headers</code> is an object containing the response headers</li>
|
<li><code>headers</code> is an object containing the response headers</li>
|
||||||
<li><code>responseUrl</code> is the url of the server that responds</li>
|
<li><code>responseUrl</code> is the url of the server that responds</li>
|
||||||
</ul>
|
</ul>
|
||||||
<p>The <b>interval</b> can be specified for (multipart) streams, to limit the number parts received: this is the number of milliseconds between parts in the stream.</p>
|
|
||||||
<p><b>Note</b>: If you need to configure a proxy please add <b>http_proxy=...</b> to your environment variables and restart Node-RED.</p>
|
<p><b>Note</b>: If you need to configure a proxy please add <b>http_proxy=...</b> to your environment variables and restart Node-RED.</p>
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
@ -118,7 +105,6 @@
|
|||||||
ret: {value:"txt"},
|
ret: {value:"txt"},
|
||||||
url:{value:"",validate:function(v) { return (v.trim().length === 0) || (v.indexOf("://") === -1) || (v.trim().indexOf("http") === 0)} },
|
url:{value:"",validate:function(v) { return (v.trim().length === 0) || (v.indexOf("://") === -1) || (v.trim().indexOf("http") === 0)} },
|
||||||
tls: {type:"tls-config",required: false}
|
tls: {type:"tls-config",required: false}
|
||||||
throttling: {value:0}
|
|
||||||
},
|
},
|
||||||
credentials: {
|
credentials: {
|
||||||
user: {type:"text"},
|
user: {type:"text"},
|
||||||
@ -153,21 +139,6 @@
|
|||||||
}
|
}
|
||||||
$("#node-input-useAuth").change();
|
$("#node-input-useAuth").change();
|
||||||
|
|
||||||
$("#node-input-throttling").change(function() {
|
|
||||||
if ($(this).is(":checked")) {
|
|
||||||
$(".node-input-throttling-row").show();
|
|
||||||
} else {
|
|
||||||
$(".node-input-throttling-row").hide();
|
|
||||||
$('#node-input-interval').val('0');
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (this.interval) {
|
|
||||||
$('#node-input-throttling').prop('checked', true);
|
|
||||||
} else {
|
|
||||||
$('#node-input-throttling').prop('checked', false);
|
|
||||||
}
|
|
||||||
$("#node-input-throttling").change();
|
|
||||||
|
|
||||||
function updateTLSOptions() {
|
function updateTLSOptions() {
|
||||||
if ($("#node-input-usetls").is(':checked')) {
|
if ($("#node-input-usetls").is(':checked')) {
|
||||||
$("#node-row-tls").show();
|
$("#node-row-tls").show();
|
||||||
|
@ -32,9 +32,6 @@ module.exports = function(RED) {
|
|||||||
var tlsNode = RED.nodes.getNode(n.tls);
|
var tlsNode = RED.nodes.getNode(n.tls);
|
||||||
}
|
}
|
||||||
this.ret = n.ret || "txt";
|
this.ret = n.ret || "txt";
|
||||||
this.prevReq = null;
|
|
||||||
this.throttling = n.throttling;
|
|
||||||
this.throttleInterval = n.interval;
|
|
||||||
if (RED.settings.httpRequestTimeout) { this.reqTimeout = parseInt(RED.settings.httpRequestTimeout) || 120000; }
|
if (RED.settings.httpRequestTimeout) { this.reqTimeout = parseInt(RED.settings.httpRequestTimeout) || 120000; }
|
||||||
else { this.reqTimeout = 120000; }
|
else { this.reqTimeout = 120000; }
|
||||||
|
|
||||||
@ -44,86 +41,10 @@ module.exports = function(RED) {
|
|||||||
if (process.env.no_proxy != null) { noprox = process.env.no_proxy.split(","); }
|
if (process.env.no_proxy != null) { noprox = process.env.no_proxy.split(","); }
|
||||||
if (process.env.NO_PROXY != null) { noprox = process.env.NO_PROXY.split(","); }
|
if (process.env.NO_PROXY != null) { noprox = process.env.NO_PROXY.split(","); }
|
||||||
|
|
||||||
function handleMsg(msg, boundary, preRequestTimestamp, currentStatus) {
|
|
||||||
if (node.metric()) {
|
|
||||||
// Calculate request time
|
|
||||||
var diff = process.hrtime(preRequestTimestamp);
|
|
||||||
var ms = diff[0] * 1e3 + diff[1] * 1e-6;
|
|
||||||
var metricRequestDurationMillis = ms.toFixed(3);
|
|
||||||
node.metric("duration.millis", msg, metricRequestDurationMillis);
|
|
||||||
if (res.client && res.client.bytesRead) {
|
|
||||||
node.metric("size.bytes", msg, res.client.bytesRead);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert the payload to the required return type
|
|
||||||
msg.payload = Buffer.concat(msg.payload); // bin
|
|
||||||
if (node.ret !== "bin") {
|
|
||||||
msg.payload = msg.payload.toString('utf8'); // txt
|
|
||||||
|
|
||||||
if (node.ret === "obj") {
|
|
||||||
try { msg.payload = JSON.parse(msg.payload); } // obj
|
|
||||||
catch(e) { node.warn(RED._("httpin.errors.json-error")); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case of multipart streaming, all End-of-line characters should be removed (both from the
|
|
||||||
// end and the start). Otherwise the data will be considered corrupt. These characters are
|
|
||||||
// remainings from the boundaries and part headers ...
|
|
||||||
if (boundary) {
|
|
||||||
var begin = 0;
|
|
||||||
var end = msg.payload.length - 1;
|
|
||||||
|
|
||||||
// Trim CR or LF characters at the end of the payload
|
|
||||||
for (var i = end; i >= begin; i--) {
|
|
||||||
if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
end--;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trim optional CR or LF characters at the start of the current body
|
|
||||||
for (var i = begin; i <= end; i++) {
|
|
||||||
if (msg.payload[i] !== '\n' && msg.payload[i] !== '\r') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
begin++;
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.payload = msg.payload.slice(begin, end);
|
|
||||||
|
|
||||||
if (msg.payload.length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
node.send(msg);
|
|
||||||
|
|
||||||
if (!boundary) {
|
|
||||||
node.status({});
|
|
||||||
}
|
|
||||||
else if ((Date.now() - currentStatus.timestamp) > 1000) {
|
|
||||||
// For multipart streaming, the node status is inverted every second (to let user know it is still busy processing)
|
|
||||||
if (currentStatus.value === "{}") {
|
|
||||||
currentStatus.value = {fill:"blue",shape:"dot",text:"httpin.status.streaming"};
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
currentStatus.value = "{}";
|
|
||||||
}
|
|
||||||
node.status(currentStatus.value);
|
|
||||||
currentStatus.timestamp = Date.now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.on("input",function(msg) {
|
this.on("input",function(msg) {
|
||||||
var boundary = "";
|
|
||||||
var headerBodySeparator = "";
|
|
||||||
var headerSeparator = "";
|
|
||||||
var searchString = "";
|
|
||||||
var currentStatus = {timestamp:0, value:'{}'};
|
|
||||||
var preRequestTimestamp = process.hrtime();
|
var preRequestTimestamp = process.hrtime();
|
||||||
node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"});
|
node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"});
|
||||||
var url = msg.url || nodeUrl;
|
var url = nodeUrl || msg.url;
|
||||||
if (msg.url && nodeUrl && (nodeUrl !== msg.url)) { // revert change below when warning is finally removed
|
if (msg.url && nodeUrl && (nodeUrl !== msg.url)) { // revert change below when warning is finally removed
|
||||||
node.warn(RED._("common.errors.nooverride"));
|
node.warn(RED._("common.errors.nooverride"));
|
||||||
}
|
}
|
||||||
@ -239,16 +160,7 @@ module.exports = function(RED) {
|
|||||||
if (tlsNode) {
|
if (tlsNode) {
|
||||||
tlsNode.addTLSOptions(opts);
|
tlsNode.addTLSOptions(opts);
|
||||||
}
|
}
|
||||||
//var abortStream = msg.hasOwnProperty("abort_stream") && msg.abort_stream == true;
|
|
||||||
if (node.prevReq /*|| abortStream*/) {
|
|
||||||
// If a previous request is still busy (endless) streaming, then stop it.
|
|
||||||
node.prevReq.abort();
|
|
||||||
}
|
|
||||||
var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) {
|
var req = ((/^https/.test(urltotest))?https:http).request(opts,function(res) {
|
||||||
var partCurrent = [];
|
|
||||||
var partHeader = [];
|
|
||||||
var partBody = [];
|
|
||||||
|
|
||||||
// Force NodeJs to return a Buffer (instead of a string)
|
// Force NodeJs to return a Buffer (instead of a string)
|
||||||
// See https://github.com/nodejs/node/issues/6038
|
// See https://github.com/nodejs/node/issues/6038
|
||||||
res.setEncoding(null);
|
res.setEncoding(null);
|
||||||
@ -261,179 +173,33 @@ module.exports = function(RED) {
|
|||||||
|
|
||||||
// msg.url = url; // revert when warning above finally removed
|
// msg.url = url; // revert when warning above finally removed
|
||||||
res.on('data',function(chunk) {
|
res.on('data',function(chunk) {
|
||||||
var searchIndex = -1;
|
msg.payload.push(chunk);
|
||||||
|
|
||||||
if (!boundary) {
|
|
||||||
// -----------------------------------------------------------------------------------------
|
|
||||||
// Automatically check whether multipart streaming is required (at the start of the stream)
|
|
||||||
// -----------------------------------------------------------------------------------------
|
|
||||||
var contentType = this.headers['content-type'];
|
|
||||||
|
|
||||||
if (!/multipart/.test(contentType)) {
|
|
||||||
node.error(RED._("httpin.errors.no-multipart"),msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Automatically detect the required boundary (that will be used between parts of the stream)
|
|
||||||
boundary = (contentType.match(/.*;\sboundary=(.*)/) || [null, null])[1];
|
|
||||||
|
|
||||||
if(!boundary) {
|
|
||||||
node.error(RED._("httpin.errors.no-boundary"),msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// A boundary needs to start with -- (even if -- is absent in the http header)
|
|
||||||
if (!boundary.startsWith('--')) {
|
|
||||||
boundary = '--' + boundary;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Every part contains one or more headers and one body (content).
|
|
||||||
// Headers and body are separated by two EOL (end of line) symbols.
|
|
||||||
// Those EOL symbols can be LF (linefeed \n) or CR (carriage return \r) or CRLF (carriage return linefeed \r\n).
|
|
||||||
// Determine the EOL symbols at the start of the stream.
|
|
||||||
var eolSymbols = (chunk.toString().match(/(?:\r\r|\n\n|\r\n\r\n)/g) || []);
|
|
||||||
|
|
||||||
if (eolSymbols.indexOf('\r\n\r\n') >= 0) {
|
|
||||||
headerBodySeparator = '\r\n\r\n';
|
|
||||||
}
|
|
||||||
else if (eolSymbols.indexOf('\r\r') >= 0) {
|
|
||||||
headerBodySeparator = '\r\r';
|
|
||||||
}
|
|
||||||
else if (eolSymbols.indexOf('\n\n') >= 0) {
|
|
||||||
headerBodySeparator = '\n\n';
|
|
||||||
}
|
|
||||||
|
|
||||||
if(!headerBodySeparator) {
|
|
||||||
node.error(RED._("httpin.errors.no-separator"),msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The header separator is only one half of the header body separator;
|
|
||||||
headerSeparator = headerBodySeparator.slice(0, headerBodySeparator.length/2);
|
|
||||||
|
|
||||||
// Store the current request only in case streaming is detected, so it could be aborted afterwards
|
|
||||||
node.prevReq = req;
|
|
||||||
|
|
||||||
// The boundary should arrive at the start of the stream, so let's start searching for it
|
|
||||||
searchString = boundary;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!boundary) {
|
|
||||||
// When no multipart streaming (i.e. request with single response part)
|
|
||||||
msg.payload.push(chunk);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// -----------------------------------------------------------------------------------------
|
|
||||||
// Stream the data in the new chunk
|
|
||||||
// -----------------------------------------------------------------------------------------
|
|
||||||
var checkOverlap = partBody.length > 0;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
if (searchString == boundary) {
|
|
||||||
partCurrent = partBody;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
partCurrent = partHeader;
|
|
||||||
}
|
|
||||||
|
|
||||||
// When starting with a new chunk and a previous chunk is available, check whether the search string is
|
|
||||||
// splitted across two chunks. Indeed data is splitted into chunks by the transport layer, which has
|
|
||||||
// no knowledge of the protocol being used (so vital data might be splitted).
|
|
||||||
if (checkOverlap == true) {
|
|
||||||
checkOverlap = false;
|
|
||||||
|
|
||||||
// For a searchString of N characters, create a new buffer containing the last N-1 characters
|
|
||||||
// of the previous chunk and N-1 characters of the current chunk.
|
|
||||||
var previousChunk = partCurrent[partCurrent.length - 1];
|
|
||||||
var previousTrail = previousChunk.slice(previousChunk.length - searchString.length + 1);
|
|
||||||
var currentLead = chunk.slice(0, searchString.length-1);
|
|
||||||
var chunkOverlap = Buffer.concat([previousTrail, currentLead]);
|
|
||||||
|
|
||||||
searchIndex = chunkOverlap.indexOf(searchString);
|
|
||||||
if (searchIndex >= 0) {
|
|
||||||
// Cut off the previous body chunk at the position where the search string starts
|
|
||||||
partCurrent[partCurrent.length - 1] = previousChunk.slice(0, previousChunk.length - searchString.length + searchIndex + 1);
|
|
||||||
|
|
||||||
// Adjust the start of the current chunk
|
|
||||||
chunk = chunk.slice(searchIndex + 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Try to find the search string in the current chunk
|
|
||||||
searchIndex = chunk.indexOf(searchString);
|
|
||||||
|
|
||||||
if (searchIndex >= 0) {
|
|
||||||
// Store the part of the chunk data preceding the position where the search string starts
|
|
||||||
partCurrent.push(chunk.slice(0, searchIndex));
|
|
||||||
|
|
||||||
// Adjust the start of the current chunk
|
|
||||||
chunk = chunk.slice(searchIndex + searchString.length);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Search string not found in this chunk, so store the chunk and proceed to the next chunk
|
|
||||||
partCurrent.push(chunk);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (searchIndex >= 0) {
|
|
||||||
if (searchString == boundary) {
|
|
||||||
// Clone the msg (without payload for speed)
|
|
||||||
var newMsg = RED.util.cloneMessage(msg);
|
|
||||||
|
|
||||||
// The part headers will be put in msg.content, with a JSON format like {Content-Type:..., Content-Length:... etc}.
|
|
||||||
// Note that part headers are optional, even the header "Content-Type" (which defaults to text/plain).
|
|
||||||
var comma = '';
|
|
||||||
var quote = '';
|
|
||||||
var content = '{';
|
|
||||||
Buffer.concat(partHeader).toString('utf8').trim().split(headerSeparator).forEach(function(entry) {
|
|
||||||
var entryArray = entry.split(":");
|
|
||||||
if (entryArray.length == 2) {
|
|
||||||
isNaN(entryArray[1]) ? quote = '"' : quote = '';
|
|
||||||
content += comma + '"' + entryArray[0].trim() + '": ' + quote + entryArray[1].trim() + quote;
|
|
||||||
comma = ', ';
|
|
||||||
}
|
|
||||||
});
|
|
||||||
content += '}';
|
|
||||||
newMsg.content = JSON.parse(content);
|
|
||||||
|
|
||||||
// If a part body has been found, let's put a message on the output port
|
|
||||||
newMsg.payload = partBody;
|
|
||||||
handleMsg(newMsg, boundary, preRequestTimestamp,currentStatus);
|
|
||||||
|
|
||||||
// Everything has been send, so start collecting data all over again ...
|
|
||||||
partHeader = [];
|
|
||||||
partBody = [];
|
|
||||||
|
|
||||||
// If a (non-zero) throttling interval is specified, the upload should be pauzed during that interval.
|
|
||||||
// If the message contains a throttling interval, it will override the node's throttling interval.
|
|
||||||
var throttleInterval = msg.throttle || node.throttleInterval;
|
|
||||||
if (throttleInterval && throttleInterval !== 0) {
|
|
||||||
res.pause();
|
|
||||||
setTimeout(function () {
|
|
||||||
res.resume();
|
|
||||||
}, throttleInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Boundary found, so from here on we will try to find a headerbodyseparator
|
|
||||||
searchString = headerBodySeparator;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// HeaderBodySeparator found, so from here on we will try to find a boundary
|
|
||||||
searchString = boundary;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
res.on('end',function() {
|
res.on('end',function() {
|
||||||
if(boundary) {
|
if (node.metric()) {
|
||||||
// If streaming is interrupted, the last part might not be complete (so skip it)
|
// Calculate request time
|
||||||
node.status({});
|
var diff = process.hrtime(preRequestTimestamp);
|
||||||
|
var ms = diff[0] * 1e3 + diff[1] * 1e-6;
|
||||||
|
var metricRequestDurationMillis = ms.toFixed(3);
|
||||||
|
node.metric("duration.millis", msg, metricRequestDurationMillis);
|
||||||
|
if (res.client && res.client.bytesRead) {
|
||||||
|
node.metric("size.bytes", msg, res.client.bytesRead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
handleMsg(msg, boundary, preRequestTimestamp, currentStatus);
|
// Convert the payload to the required return type
|
||||||
|
msg.payload = Buffer.concat(msg.payload); // bin
|
||||||
|
if (node.ret !== "bin") {
|
||||||
|
msg.payload = msg.payload.toString('utf8'); // txt
|
||||||
|
|
||||||
|
if (node.ret === "obj") {
|
||||||
|
try { msg.payload = JSON.parse(msg.payload); } // obj
|
||||||
|
catch(e) { node.warn(RED._("httpin.errors.json-error")); }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node.send(msg);
|
||||||
|
node.status({});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
req.setTimeout(node.reqTimeout, function() {
|
req.setTimeout(node.reqTimeout, function() {
|
||||||
@ -457,11 +223,6 @@ module.exports = function(RED) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.on("close",function() {
|
this.on("close",function() {
|
||||||
if (node.prevReq) {
|
|
||||||
// At (re)deploy make sure the streaming is closed, otherwise e.g. it keeps sending data across already removed wires
|
|
||||||
node.prevReq.abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
node.status({});
|
node.status({});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -341,8 +341,6 @@
|
|||||||
},
|
},
|
||||||
"setby": "- set by msg.method -",
|
"setby": "- set by msg.method -",
|
||||||
"basicauth": "Use basic authentication",
|
"basicauth": "Use basic authentication",
|
||||||
"throttling": "Limit (multipart) stream rate",
|
|
||||||
"interval": "Interval",
|
|
||||||
"use-tls": "Enable secure (SSL/TLS) connection",
|
"use-tls": "Enable secure (SSL/TLS) connection",
|
||||||
"tls-config":"TLS Configuration",
|
"tls-config":"TLS Configuration",
|
||||||
"utf8": "a UTF-8 string",
|
"utf8": "a UTF-8 string",
|
||||||
@ -364,8 +362,7 @@
|
|||||||
"invalid-transport":"non-http transport requested"
|
"invalid-transport":"non-http transport requested"
|
||||||
},
|
},
|
||||||
"status": {
|
"status": {
|
||||||
"requesting": "requesting",
|
"requesting": "requesting"
|
||||||
"streaming": "streaming"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"websocket": {
|
"websocket": {
|
||||||
|
Loading…
Reference in New Issue
Block a user