diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 1ea5445ce..3eb9d0516 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -20,7 +20,30 @@ module.exports = function(RED) { var isUtf8 = require('is-utf8'); var HttpsProxyAgent = require('https-proxy-agent'); var url = require('url'); - + const knownMediaTypes = { + "text/css":"string", + "text/html":"string", + "text/plain":"string", + "text/html":"string", + "application/json":"object", + "application/octet-stream":"buffer", + "application/pdf":"buffer", + "application/x-gtar":"buffer", + "application/x-gzip":"buffer", + "application/x-tar":"buffer", + "application/xml":"string", + "application/zip":"buffer", + "audio/aac":"buffer", + "audio/ac3":"buffer", + "audio/basic":"buffer", + "audio/mp4":"buffer", + "audio/ogg":"buffer", + "image/bmp":"buffer", + "image/gif":"buffer", + "image/jpeg":"buffer", + "image/tiff":"buffer", + "image/png":"buffer", + } //#region "Supporting functions" function matchTopic(ts,t) { if (ts == "#") { @@ -188,24 +211,7 @@ module.exports = function(RED) { */ function subscriptionHandler(node, datatype ,topic, payload, packet) { const v5 = node.brokerConn.options && node.brokerConn.options.protocolVersion == 5; - - if (datatype === "buffer") { - // payload = payload; - } else if (datatype === "base64") { - payload = payload.toString('base64'); - } else if (datatype === "utf8") { - payload = payload.toString('utf8'); - } else if (datatype === "json") { - if (isUtf8(payload)) { - payload = payload.toString(); - try { payload = JSON.parse(payload); } - catch(e) { node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; } - } - else { node.error((RED._("mqtt.errors.invalid-json-string")),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; } - } else { - if (isUtf8(payload)) { payload = payload.toString(); } - } - var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain}; + var msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain}; if(v5 && packet.properties) { setStrProp(packet.properties, msg, "responseTopic"); setBufferProp(packet.properties, msg, "correlationData"); @@ -215,6 +221,58 @@ module.exports = function(RED) { setStrProp(packet.properties, msg, "reasonString"); setUserProperties(packet.properties.userProperties, msg); } + const v5isUtf8 = v5 ? msg.payloadFormatIndicator === true : null; + const v5HasMediaType = v5 ? !!msg.contentType : null; + const v5MediaTypeLC = v5 ? (msg.contentType + "").toLowerCase() : null; + + if (datatype === "buffer") { + // payload = payload; + } else if (datatype === "base64") { + payload = payload.toString('base64'); + } else if (datatype === "utf8") { + payload = payload.toString('utf8'); + } else if (datatype === "json") { + if (v5isUtf8 || isUtf8(payload)) { + try { + payload = JSON.parse(payload.toString()); + } + catch(e) { + node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; + } + } + else { node.error((RED._("mqtt.errors.invalid-json-string")),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; } + } else { + //auto + if(v5isUtf8 || v5HasMediaType) { + const outputType = knownMediaTypes[v5MediaTypeLC] + switch (outputType) { + case "string": + payload = payload.toString(); + break; + case "buffer": + //no change + break; + case "object": + try { + payload = JSON.parse(payload.toString()); + } + catch(e) { + node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; + } + break; + default: + if (v5isUtf8 || isUtf8(payload)) { + payload = payload.toString(); //auto String + } + break; + } + } else if (isUtf8(payload)) { + payload = payload.toString(); //auto String + } //else { + //leave as buffer + //} + } + msg.payload = payload; if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) { msg._topic = topic; }