mirror of
				https://github.com/node-red/node-red.git
				synced 2025-03-01 10:36:34 +00:00 
			
		
		
		
	Merge pull request #3530 from Steve-Mcl/mqtt-payload-auto-parsing
MQTT payload auto parsing improvements
This commit is contained in:
		@@ -65,6 +65,11 @@
 | 
			
		||||
        /* opacity: 0.3;
 | 
			
		||||
        pointer-events: none; */
 | 
			
		||||
    }
 | 
			
		||||
    .form-row.form-row-mqtt-datatype-tip > .form-tips {
 | 
			
		||||
        width: calc(70% - 18px);
 | 
			
		||||
        display: inline-block;
 | 
			
		||||
        margin-top: -8px;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
</style>
 | 
			
		||||
 | 
			
		||||
@@ -121,6 +126,7 @@
 | 
			
		||||
    <div class="form-row">
 | 
			
		||||
        <label for="node-input-datatype"><i class="fa fa-sign-out"></i> <span data-i18n="mqtt.label.output"></span></label>
 | 
			
		||||
        <select id="node-input-datatype" style="width:70%;">
 | 
			
		||||
            <option value="auto-detect" data-i18n="mqtt.output.auto-detect"></option>
 | 
			
		||||
            <option value="auto" data-i18n="mqtt.output.auto"></option>
 | 
			
		||||
            <option value="buffer" data-i18n="mqtt.output.buffer"></option>
 | 
			
		||||
            <option value="utf8" data-i18n="mqtt.output.string"></option>
 | 
			
		||||
@@ -128,6 +134,10 @@
 | 
			
		||||
            <option value="base64" data-i18n="mqtt.output.base64"></option>
 | 
			
		||||
        </select>
 | 
			
		||||
    </div>
 | 
			
		||||
    <div class="form-row form-row-mqtt-datatype-tip">
 | 
			
		||||
        <label>   </label>
 | 
			
		||||
        <div class="form-tips" id="mqtt-in-datatype-depreciated-tip"><span data-i18n="mqtt.label.auto-mode-depreciated"></span></div>
 | 
			
		||||
    </div>
 | 
			
		||||
    <div class="form-row">
 | 
			
		||||
        <label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="common.label.name"></span></label>
 | 
			
		||||
        <input type="text" id="node-input-name" data-i18n="[placeholder]common.label.name">
 | 
			
		||||
@@ -750,7 +760,7 @@
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            qos: {value: "2"},
 | 
			
		||||
            datatype: {value:"auto",required:true},
 | 
			
		||||
            datatype: {value:"auto-detect",required:true},
 | 
			
		||||
            broker: {type:"mqtt-broker", required:true},
 | 
			
		||||
            // subscriptionIdentifier: {value:0},
 | 
			
		||||
            nl: {value:false},
 | 
			
		||||
@@ -787,6 +797,16 @@
 | 
			
		||||
                $("div.form-row-mqtt5").toggleClass("form-row-mqtt5-active",!!v5);
 | 
			
		||||
                $("div.form-row.form-row-mqtt-static").toggleClass("form-row-mqtt-static-disabled", !!dynamic)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            $("#node-input-datatype").on("change", function() {
 | 
			
		||||
                if($(this).val() === "auto") {
 | 
			
		||||
                    $(".form-row.form-row-mqtt-datatype-tip").show();
 | 
			
		||||
                } else {
 | 
			
		||||
                    $(".form-row.form-row-mqtt-datatype-tip").hide();
 | 
			
		||||
                }
 | 
			
		||||
            })
 | 
			
		||||
            $("#node-input-datatype").trigger("change");
 | 
			
		||||
 | 
			
		||||
            $("#node-input-broker").on("change",function(d){
 | 
			
		||||
                updateVisibility();
 | 
			
		||||
            });
 | 
			
		||||
@@ -807,7 +827,7 @@
 | 
			
		||||
                $("#node-input-qos").val("2");
 | 
			
		||||
            }
 | 
			
		||||
            if (this.datatype === undefined) {
 | 
			
		||||
                $("#node-input-datatype").val("auto");
 | 
			
		||||
                $("#node-input-datatype").val("auto-detect");
 | 
			
		||||
            }
 | 
			
		||||
        },
 | 
			
		||||
        oneditsave: function() {
 | 
			
		||||
 
 | 
			
		||||
@@ -20,7 +20,30 @@ module.exports = function(RED) {
 | 
			
		||||
    var isUtf8 = require('is-utf8');
 | 
			
		||||
    var HttpsProxyAgent = require('https-proxy-agent');
 | 
			
		||||
    var url = require('url');
 | 
			
		||||
 | 
			
		||||
    const knownMediaTypes = {
 | 
			
		||||
        "text/css":"string",
 | 
			
		||||
        "text/html":"string",
 | 
			
		||||
        "text/plain":"string",
 | 
			
		||||
        "text/html":"string",
 | 
			
		||||
        "application/json":"json",
 | 
			
		||||
        "application/octet-stream":"buffer",
 | 
			
		||||
        "application/pdf":"buffer",
 | 
			
		||||
        "application/x-gtar":"buffer",
 | 
			
		||||
        "application/x-gzip":"buffer",
 | 
			
		||||
        "application/x-tar":"buffer",
 | 
			
		||||
        "application/xml":"string",
 | 
			
		||||
        "application/zip":"buffer",
 | 
			
		||||
        "audio/aac":"buffer",
 | 
			
		||||
        "audio/ac3":"buffer",
 | 
			
		||||
        "audio/basic":"buffer",
 | 
			
		||||
        "audio/mp4":"buffer",
 | 
			
		||||
        "audio/ogg":"buffer",
 | 
			
		||||
        "image/bmp":"buffer",
 | 
			
		||||
        "image/gif":"buffer",
 | 
			
		||||
        "image/jpeg":"buffer",
 | 
			
		||||
        "image/tiff":"buffer",
 | 
			
		||||
        "image/png":"buffer",
 | 
			
		||||
    }
 | 
			
		||||
    //#region "Supporting functions"
 | 
			
		||||
    function matchTopic(ts,t) {
 | 
			
		||||
        if (ts == "#") {
 | 
			
		||||
@@ -188,24 +211,7 @@ module.exports = function(RED) {
 | 
			
		||||
     */
 | 
			
		||||
    function subscriptionHandler(node, datatype ,topic, payload, packet) {
 | 
			
		||||
        const v5 = node.brokerConn.options && node.brokerConn.options.protocolVersion == 5;
 | 
			
		||||
 | 
			
		||||
        if (datatype === "buffer") {
 | 
			
		||||
            // payload = payload;
 | 
			
		||||
        } else if (datatype === "base64") {
 | 
			
		||||
            payload = payload.toString('base64');
 | 
			
		||||
        } else if (datatype === "utf8") {
 | 
			
		||||
            payload = payload.toString('utf8');
 | 
			
		||||
        } else if (datatype === "json") {
 | 
			
		||||
            if (isUtf8(payload)) {
 | 
			
		||||
                payload = payload.toString();
 | 
			
		||||
                try { payload = JSON.parse(payload); }
 | 
			
		||||
                catch(e) { node.error(RED._("mqtt.errors.invalid-json-parse"),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; }
 | 
			
		||||
            }
 | 
			
		||||
            else { node.error((RED._("mqtt.errors.invalid-json-string")),{payload:payload, topic:topic, qos:packet.qos, retain:packet.retain}); return; }
 | 
			
		||||
        } else {
 | 
			
		||||
            if (isUtf8(payload)) { payload = payload.toString(); }
 | 
			
		||||
        }
 | 
			
		||||
        var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain};
 | 
			
		||||
        var msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain};
 | 
			
		||||
        if(v5 && packet.properties) {
 | 
			
		||||
            setStrProp(packet.properties, msg, "responseTopic");
 | 
			
		||||
            setBufferProp(packet.properties, msg, "correlationData");
 | 
			
		||||
@@ -215,6 +221,76 @@ module.exports = function(RED) {
 | 
			
		||||
            setStrProp(packet.properties, msg, "reasonString");
 | 
			
		||||
            setUserProperties(packet.properties.userProperties, msg);
 | 
			
		||||
        }
 | 
			
		||||
        const v5isUtf8 = v5 ? msg.payloadFormatIndicator === true : null;
 | 
			
		||||
        const v5HasMediaType = v5 ? !!msg.contentType : null;
 | 
			
		||||
        const v5MediaTypeLC = v5 ? (msg.contentType + "").toLowerCase() : null;
 | 
			
		||||
 | 
			
		||||
        if (datatype === "buffer") {
 | 
			
		||||
            // payload = payload;
 | 
			
		||||
        } else if (datatype === "base64") {
 | 
			
		||||
            payload = payload.toString('base64');
 | 
			
		||||
        } else if (datatype === "utf8") {
 | 
			
		||||
            payload = payload.toString('utf8');
 | 
			
		||||
        } else if (datatype === "json") {
 | 
			
		||||
            if (v5isUtf8 || isUtf8(payload)) {
 | 
			
		||||
                try {
 | 
			
		||||
                    payload = JSON.parse(payload.toString());
 | 
			
		||||
                } catch (e) {
 | 
			
		||||
                    node.error(RED._("mqtt.errors.invalid-json-parse"), { payload: payload, topic: topic, qos: packet.qos, retain: packet.retain }); return;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                node.error((RED._("mqtt.errors.invalid-json-string")), { payload: payload, topic: topic, qos: packet.qos, retain: packet.retain }); return;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            //"auto" (legacy) or "auto-detect" (new default)
 | 
			
		||||
            if (v5isUtf8 || v5HasMediaType) {
 | 
			
		||||
                const outputType = knownMediaTypes[v5MediaTypeLC]
 | 
			
		||||
                switch (outputType) {
 | 
			
		||||
                    case "string":
 | 
			
		||||
                        payload = payload.toString();
 | 
			
		||||
                        break;
 | 
			
		||||
                    case "buffer":
 | 
			
		||||
                        //no change
 | 
			
		||||
                        break;
 | 
			
		||||
                    case "json":
 | 
			
		||||
                        try {
 | 
			
		||||
                            //since v5 type states this should be JSON, parse it & error out if NOT JSON
 | 
			
		||||
                            payload = payload.toString()
 | 
			
		||||
                            const obj = JSON.parse(payload);
 | 
			
		||||
                            if (datatype === "auto-detect") {
 | 
			
		||||
                                payload = obj; //as mode is "auto-detect", return the parsed JSON
 | 
			
		||||
                            }
 | 
			
		||||
                        } catch (e) {
 | 
			
		||||
                            node.error(RED._("mqtt.errors.invalid-json-parse"), { payload: payload, topic: topic, qos: packet.qos, retain: packet.retain }); return;
 | 
			
		||||
                        }
 | 
			
		||||
                        break;
 | 
			
		||||
                    default:
 | 
			
		||||
                        if (v5isUtf8 || isUtf8(payload)) {
 | 
			
		||||
                            payload = payload.toString(); //auto String
 | 
			
		||||
                            if (datatype === "auto-detect") {
 | 
			
		||||
                                try {
 | 
			
		||||
                                    payload = JSON.parse(payload); //auto to parsed object (attempt)
 | 
			
		||||
                                } catch (e) {
 | 
			
		||||
                                    /* mute error - it simply isnt JSON, just leave payload as a string */
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        break;
 | 
			
		||||
                }
 | 
			
		||||
            } else if (isUtf8(payload)) {
 | 
			
		||||
                payload = payload.toString(); //auto String
 | 
			
		||||
                if (datatype === "auto-detect") {
 | 
			
		||||
                    try {
 | 
			
		||||
                        payload = JSON.parse(payload);
 | 
			
		||||
                    } catch (e) {
 | 
			
		||||
                        /* mute error - it simply isnt JSON, just leave payload as a string */
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } //else { 
 | 
			
		||||
                //leave as buffer
 | 
			
		||||
            //}
 | 
			
		||||
        }
 | 
			
		||||
        msg.payload = payload;
 | 
			
		||||
        if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
 | 
			
		||||
            msg._topic = topic;
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -423,6 +423,7 @@
 | 
			
		||||
            "string": "Ein String",
 | 
			
		||||
            "base64": "Ein Base64-kodierter String",
 | 
			
		||||
            "auto": "Auto-Erkennung (string oder buffer)",
 | 
			
		||||
            "auto-detect": "Auto-Erkennung (parsed JSON-Objekt, string oder buffer)",
 | 
			
		||||
            "json": "Ein analysiertes (parsed) JSON-Objekt"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "wahr",
 | 
			
		||||
 
 | 
			
		||||
@@ -424,7 +424,8 @@
 | 
			
		||||
            "action": "Action",
 | 
			
		||||
            "staticTopic": "Subscribe to single topic",
 | 
			
		||||
            "dynamicTopic": "Dynamic subscription",
 | 
			
		||||
            "auto-connect": "Connect automatically"
 | 
			
		||||
            "auto-connect": "Connect automatically",
 | 
			
		||||
            "auto-mode-depreciated": "This option is depreciated. Please use the new auto-detect mode."
 | 
			
		||||
        },
 | 
			
		||||
        "sections-label":{
 | 
			
		||||
            "birth-message": "Message sent on connection (birth message)",
 | 
			
		||||
@@ -455,6 +456,7 @@
 | 
			
		||||
            "string": "a String",
 | 
			
		||||
            "base64": "a Base64 encoded string",
 | 
			
		||||
            "auto": "auto-detect (string or buffer)",
 | 
			
		||||
            "auto-detect": "auto-detect (parsed JSON object, string or buffer)",
 | 
			
		||||
            "json": "a parsed JSON object"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "true",
 | 
			
		||||
 
 | 
			
		||||
@@ -455,6 +455,7 @@
 | 
			
		||||
            "string": "文字列",
 | 
			
		||||
            "base64": "Base64文字列",
 | 
			
		||||
            "auto": "自動判定(文字列もしくはバイナリバッファ)",
 | 
			
		||||
            "auto-detect": "自動判定(JSONオブジェクト、文字列もしくはバイナリバッファ)",
 | 
			
		||||
            "json": "JSONオブジェクト"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "する",
 | 
			
		||||
 
 | 
			
		||||
@@ -362,6 +362,7 @@
 | 
			
		||||
            "string": "문자열",
 | 
			
		||||
            "base64": "Base64문자열",
 | 
			
		||||
            "auto": "자동판정(문자열혹은 바이너리버퍼)",
 | 
			
		||||
            "auto-detect": "자동판정(JSON오브젝트, 문자열혹은 바이너리버퍼)",
 | 
			
		||||
            "json": "JSON오브젝트"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "한다",
 | 
			
		||||
 
 | 
			
		||||
@@ -385,6 +385,7 @@
 | 
			
		||||
            "string": "строка",
 | 
			
		||||
            "base64": "строка в кодировке Base64",
 | 
			
		||||
            "auto": "автоопределение (строка или буфер)",
 | 
			
		||||
            "auto-detect": "автоопределение (разобрать объект JSON, строка или буфер)",
 | 
			
		||||
            "json": "объект JSON"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "да",
 | 
			
		||||
 
 | 
			
		||||
@@ -382,6 +382,7 @@
 | 
			
		||||
            "string": "字符串",
 | 
			
		||||
            "base64": "Base64编码字符串",
 | 
			
		||||
            "auto": "自动检测 (字符串或buffer)",
 | 
			
		||||
            "auto-detect": "自动检测 (已解析的JSON对象、字符串或buffer)",
 | 
			
		||||
            "json": "解析的JSON对象"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "是",
 | 
			
		||||
 
 | 
			
		||||
@@ -386,6 +386,7 @@
 | 
			
		||||
            "string": "字串",
 | 
			
		||||
            "base64": "Base64編碼字串",
 | 
			
		||||
            "auto": "自動檢測 (字符串或buffer)",
 | 
			
		||||
            "auto-detect": "自动检测 (已解析的JSON对象、字符串或buffer)",
 | 
			
		||||
            "json": "解析的JSON對象"
 | 
			
		||||
        },
 | 
			
		||||
        "true": "是",
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@
 | 
			
		||||
"use strict";
 | 
			
		||||
const should = require("should");
 | 
			
		||||
const helper = require("node-red-node-test-helper");
 | 
			
		||||
const { doesNotThrow } = require("should");
 | 
			
		||||
const mqttNodes = require("nr-test-utils").require("@node-red/nodes/core/network/10-mqtt.js");
 | 
			
		||||
const BROKER_HOST = process.env.MQTT_BROKER_SERVER || "localhost";
 | 
			
		||||
const BROKER_PORT = process.env.MQTT_BROKER_PORT || 1883;
 | 
			
		||||
@@ -92,7 +93,8 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send JSON and receive string (auto)', function (done) {
 | 
			
		||||
    //Prior to V3, "auto" mode would only parse to string or buffer.
 | 
			
		||||
    itConditional('should send JSON and receive string (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -104,7 +106,44 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    })
 | 
			
		||||
    itConditional('should send JSON and receive string (utf8)', function (done) {
 | 
			
		||||
    //In V3, "auto" mode should try to parse JSON, then string and fall back to buffer
 | 
			
		||||
    itConditional('should send JSON and receive object (auto-detect mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: '{"prop":"value1", "num":1}',
 | 
			
		||||
            qos: 1
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        options.expectMsg.payload = JSON.parse(options.sendMsg.payload);
 | 
			
		||||
        testSendRecv({}, { datatype: "auto-detect", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    })
 | 
			
		||||
    itConditional('should send invalid JSON and receive string (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: '{prop:"value3", "num":3}'// send invalid JSON ...
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);//expect same payload
 | 
			
		||||
        testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send invalid JSON and receive string (auto-detect mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: '{prop:"value3", "num":3}'// send invalid JSON ...
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);//expect same payload
 | 
			
		||||
        testSendRecv({}, { datatype: "auto-detect", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    itConditional('should send JSON and receive string (utf8 mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -116,7 +155,7 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({}, { datatype: "utf8", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send JSON and receive Object (json)', function (done) {
 | 
			
		||||
    itConditional('should send JSON and receive Object (json mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -127,7 +166,31 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: { "prop": "value3", "num": 3 } });//expect an object
 | 
			
		||||
        testSendRecv({}, { datatype: "json", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send String and receive Buffer (buffer)', function (done) {
 | 
			
		||||
    itConditional('should send invalid JSON and raise error (json mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: '{prop:"value3", "num":3}', // send invalid JSON ...
 | 
			
		||||
        }
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
 | 
			
		||||
            helperNode.on("input", function (msg) {
 | 
			
		||||
                try {
 | 
			
		||||
                    msg.should.have.a.property("error").type("object");
 | 
			
		||||
                    msg.error.should.have.a.property("source").type("object");
 | 
			
		||||
                    msg.error.source.should.have.a.property("id", mqttIn.id);
 | 
			
		||||
                    done();
 | 
			
		||||
                } catch (err) {
 | 
			
		||||
                    done(err)
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            return true; //handled
 | 
			
		||||
        }
 | 
			
		||||
        testSendRecv({}, { datatype: "json", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send String and receive Buffer (buffer mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -138,7 +201,7 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: Buffer.from(options.sendMsg.payload) });//expect Buffer.from(msg.payload)
 | 
			
		||||
        testSendRecv({}, { datatype: "buffer", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send utf8 Buffer and receive String (auto)', function (done) {
 | 
			
		||||
    itConditional('should send utf8 Buffer and receive String (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -149,7 +212,7 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: "x y z" });//set expected payload to "x y z"
 | 
			
		||||
        testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, { done: done });
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send non utf8 Buffer and receive Buffer (auto)', function (done) {
 | 
			
		||||
    itConditional('should send non utf8 Buffer and receive Buffer (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
@@ -158,7 +221,7 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF]) //non valid UTF8
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, {payload: Buffer.from([0xC0, 0xC1, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF])});
 | 
			
		||||
        testSendRecv({}, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send/receive all v5 flags and settings', function (done) {
 | 
			
		||||
@@ -168,16 +231,16 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: t + "/command", payload: Buffer.from("v5"), qos: 1, retain: true,
 | 
			
		||||
            topic: t + "/command", payload: Buffer.from('{"version":"v5"}'), qos: 1, retain: true,
 | 
			
		||||
            responseTopic: t + "/response",
 | 
			
		||||
            userProperties: { prop1: "val1" },
 | 
			
		||||
            contentType: "application/json",
 | 
			
		||||
            contentType: "text/plain",
 | 
			
		||||
            correlationData: Buffer.from([1, 2, 3]),
 | 
			
		||||
            payloadFormatIndicator: true,
 | 
			
		||||
            messageExpiryInterval: 2000,
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        options.expectMsg.payload = options.expectMsg.payload.toString(); //auto mode + payloadFormatIndicator should make a string
 | 
			
		||||
        options.expectMsg.payload = options.expectMsg.payload.toString(); //auto mode + payloadFormatIndicator + contentType: "text/plain" should make a string
 | 
			
		||||
        delete options.expectMsg.payloadFormatIndicator; //Seems mqtt.js only publishes payloadFormatIndicator the will msg
 | 
			
		||||
        const inOptions = {
 | 
			
		||||
            datatype: "auto", topicType: "static",
 | 
			
		||||
@@ -185,6 +248,109 @@ describe('MQTT Nodes', function () {
 | 
			
		||||
        }
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, inOptions, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send regular string with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: "abc", contentType: "text/plain"
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send JSON with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: '{"prop":"val"}', contentType: "text/plain"
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send JSON with v5 media type "text/plain" and receive a string (auto-detect mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: '{"prop":"val"}', contentType: "text/plain"
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg);
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send JSON with v5 media type "application/json" and receive an object (auto-detect mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: '{"prop":"val"}', contentType: "application/json",
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: JSON.parse(options.sendMsg.payload)});
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send invalid JSON with v5 media type "application/json" and raise an error (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(),
 | 
			
		||||
            payload: '{prop:"value3", "num":3}', contentType: "application/json", // send invalid JSON ...
 | 
			
		||||
        }
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        hooks.afterLoad = (helperNode, mqttBroker, mqttIn, mqttOut) => {
 | 
			
		||||
            helperNode.on("input", function (msg) {
 | 
			
		||||
                try {
 | 
			
		||||
                    msg.should.have.a.property("error").type("object");
 | 
			
		||||
                    msg.error.should.have.a.property("source").type("object");
 | 
			
		||||
                    msg.error.source.should.have.a.property("id", mqttIn.id);
 | 
			
		||||
                    done();
 | 
			
		||||
                } catch (err) {
 | 
			
		||||
                    done(err)
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            return true; //handled
 | 
			
		||||
        }
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    
 | 
			
		||||
    itConditional('should send buffer with v5 media type "application/json" and receive an object (auto-detect mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "application/json",
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: {"prop":"val"}});
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto-detect", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send buffer with v5 media type "text/plain" and receive a string (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "text/plain",
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: '{"prop":"val"}'});
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
    itConditional('should send buffer with v5 media type "application/zip" and receive a buffer (auto mode)', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
        const options = {}
 | 
			
		||||
        const hooks = { done: done, beforeLoad: null, afterLoad: null, afterConnect: null }
 | 
			
		||||
        options.sendMsg = {
 | 
			
		||||
            topic: nextTopic(), payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d]), contentType: "application/zip",
 | 
			
		||||
        }
 | 
			
		||||
        options.expectMsg = Object.assign({}, options.sendMsg, { payload: Buffer.from([0x7b,0x22,0x70,0x72,0x6f,0x70,0x22,0x3a,0x22,0x76,0x61,0x6c,0x22,0x7d])});
 | 
			
		||||
        testSendRecv({ protocolVersion: 5 }, { datatype: "auto", topicType: "static" }, {}, options, hooks);
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    itConditional('should subscribe dynamically via action', function (done) {
 | 
			
		||||
        if (skipTests) { return this.skip() }
 | 
			
		||||
        this.timeout = 2000;
 | 
			
		||||
@@ -463,14 +629,16 @@ function buildBasicMQTTSendRecvFlow(brokerOptions, inOptions, outOptions) {
 | 
			
		||||
    const inNode = buildMQTTInNode(inOptions.id, inOptions.name, inOptions.broker || broker.id, inOptions.topic, inOptions, ["helper.node"]);
 | 
			
		||||
    const outNode = buildMQTTOutNode(outOptions.id, outOptions.name, outOptions.broker || broker.id, outOptions.topic, outOptions);
 | 
			
		||||
    const helper = buildNode("helper", "helper.node", "helper_node", {});
 | 
			
		||||
    const catchNode = buildNode("catch", "catch.node", "catch_node", {"scope": ["mqtt.in"]}, ["helper.node"]);
 | 
			
		||||
    return {
 | 
			
		||||
        nodes: {
 | 
			
		||||
            [broker.name]: broker,
 | 
			
		||||
            [inNode.name]: inNode,
 | 
			
		||||
            [outNode.name]: outNode,
 | 
			
		||||
            [helper.name]: helper,
 | 
			
		||||
            [catchNode.name]: catchNode,
 | 
			
		||||
        },
 | 
			
		||||
        flow: [broker, inNode, outNode, helper]
 | 
			
		||||
        flow: [broker, inNode, outNode, helper, catchNode]
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user