Use v5 properties to aid auto parsing payload

- closes #3421
- fixes bug in `function setBoolProp()`
This commit is contained in:
Steve-Mcl 2022-03-04 08:44:21 +00:00
parent a7932da207
commit 8f5d3dc49c
1 changed files with 77 additions and 19 deletions

View File

@ -20,7 +20,30 @@ module.exports = function(RED) {
var isUtf8 = require('is-utf8');
var HttpsProxyAgent = require('https-proxy-agent');
var url = require('url');
const knownMediaTypes = {
"text/css":"string",
"text/html":"string",
"text/plain":"string",
"text/html":"string",
"application/json":"object",
"application/octet-stream":"buffer",
"application/pdf":"buffer",
"application/x-gtar":"buffer",
"application/x-gzip":"buffer",
"application/x-tar":"buffer",
"application/xml":"string",
"application/zip":"buffer",
"audio/aac":"buffer",
"audio/ac3":"buffer",
"audio/basic":"buffer",
"audio/mp4":"buffer",
"audio/ogg":"buffer",
"image/bmp":"buffer",
"image/gif":"buffer",
"image/jpeg":"buffer",
"image/tiff":"buffer",
"image/png":"buffer",
}
//#region "Supporting functions"
function matchTopic(ts,t) {
if (ts == "#") {
@ -188,24 +211,7 @@ module.exports = function(RED) {
*/
function subscriptionHandler(node, datatype ,topic, payload, packet) {
const v5 = node.brokerConn.options && node.brokerConn.options.protocolVersion == 5;
if (datatype === "buffer") {
// payload = payload;
} else if (datatype === "base64") {
payload = payload.toString('base64');
} else if (datatype === "utf8") {
payload = payload.toString('utf8');
} else if (datatype === "json") {
if (isUtf8(payload)) {
payload = payload.toString();
try { payload = JSON.parse(payload); }
catch(e) { node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; }
}
else { node.error((RED._("mqtt.errors.invalid-json-string")),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; }
} else {
if (isUtf8(payload)) { payload = payload.toString(); }
}
var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain};
var msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain};
if(v5 && packet.properties) {
setStrProp(packet.properties, msg, "responseTopic");
setBufferProp(packet.properties, msg, "correlationData");
@ -215,6 +221,58 @@ module.exports = function(RED) {
setStrProp(packet.properties, msg, "reasonString");
setUserProperties(packet.properties.userProperties, msg);
}
const v5isUtf8 = v5 ? msg.payloadFormatIndicator === true : null;
const v5HasMediaType = v5 ? !!msg.contentType : null;
const v5MediaTypeLC = v5 ? (msg.contentType + "").toLowerCase() : null;
if (datatype === "buffer") {
// payload = payload;
} else if (datatype === "base64") {
payload = payload.toString('base64');
} else if (datatype === "utf8") {
payload = payload.toString('utf8');
} else if (datatype === "json") {
if (v5isUtf8 || isUtf8(payload)) {
try {
payload = JSON.parse(payload.toString());
}
catch(e) {
node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return;
}
}
else { node.error((RED._("mqtt.errors.invalid-json-string")),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; }
} else {
//auto
if(v5isUtf8 || v5HasMediaType) {
const outputType = knownMediaTypes[v5MediaTypeLC]
switch (outputType) {
case "string":
payload = payload.toString();
break;
case "buffer":
//no change
break;
case "object":
try {
payload = JSON.parse(payload.toString());
}
catch(e) {
node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return;
}
break;
default:
if (v5isUtf8 || isUtf8(payload)) {
payload = payload.toString(); //auto String
}
break;
}
} else if (isUtf8(payload)) {
payload = payload.toString(); //auto String
} //else {
//leave as buffer
//}
}
msg.payload = payload;
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic;
}