From 33d0d12bc828ab74bd43aeb6a0053269dd3ff360 Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Tue, 23 Oct 2018 23:04:36 +0100 Subject: [PATCH 1/2] More resilient binding to correct port for udp, give input side priority --- .../@node-red/nodes/core/io/32-udp.js | 198 ++++++++++-------- 1 file changed, 105 insertions(+), 93 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/io/32-udp.js b/packages/node_modules/@node-red/nodes/core/io/32-udp.js index 400f4bf18..d23ac6950 100644 --- a/packages/node_modules/@node-red/nodes/core/io/32-udp.js +++ b/packages/node_modules/@node-red/nodes/core/io/32-udp.js @@ -58,17 +58,36 @@ module.exports = function(RED) { if (process.version.indexOf("v0.10") === 0) { opts = node.ipv; } var server; - if (!udpInputPortsInUse.hasOwnProperty(this.port)) { + if (!udpInputPortsInUse.hasOwnProperty(node.port)) { server = dgram.createSocket(opts); // default to udp4 - udpInputPortsInUse[this.port] = server; + server.bind(node.port, function() { + if (node.multicast == "true") { + server.setBroadcast(true); + server.setMulticastLoopback(false); + try { + server.setMulticastTTL(128); + server.addMembership(node.group,node.iface); + if (node.iface) { node.status({text:n.iface+" : "+node.iface}); } + node.log(RED._("udp.status.mc-group",{group:node.group})); + } catch (e) { + if (e.errno == "EINVAL") { + node.error(RED._("udp.errors.bad-mcaddress")); + } else if (e.errno == "ENODEV") { + node.error(RED._("udp.errors.interface")); + } else { + node.error(RED._("udp.errors.error",{error:e.errno})); + } + } + } + }); + udpInputPortsInUse[node.port] = server; } else { node.log(RED._("udp.errors.alreadyused",{port:node.port})); - server = udpInputPortsInUse[this.port]; // re-use existing + server = udpInputPortsInUse[node.port]; // re-use existing + if (node.iface) { node.status({text:n.iface+" : "+node.iface}); } } - if (process.version.indexOf("v0.10") === 0) { opts = node.ipv; } - server.on("error", function (err) { if ((err.code == "EACCES") && (node.port < 1024)) { node.error(RED._("udp.errors.access-error")); @@ -92,39 +111,24 @@ module.exports = function(RED) { server.on('listening', function () { var address = server.address(); - node.log(RED._("udp.status.listener-at",{host:address.address,port:address.port})); - if (node.multicast == "true") { - server.setBroadcast(true); - try { - server.setMulticastTTL(128); - server.addMembership(node.group,node.iface); - node.log(RED._("udp.status.mc-group",{group:node.group})); - } catch (e) { - if (e.errno == "EINVAL") { - node.error(RED._("udp.errors.bad-mcaddress")); - } else if (e.errno == "ENODEV") { - node.error(RED._("udp.errors.interface")); - } else { - node.error(RED._("udp.errors.error",{error:e.errno})); - } - } - } + node.log(RED._("udp.status.listener-at",{host:node.iface||address.address,port:address.port})); + }); node.on("close", function() { - if (udpInputPortsInUse.hasOwnProperty(node.port)) { - delete udpInputPortsInUse[node.port]; - } try { + if (node.multicast == "true") { server.dropMembership(node.group); } server.close(); node.log(RED._("udp.status.listener-stopped")); } catch (err) { //node.error(err); } + if (udpInputPortsInUse.hasOwnProperty(node.port)) { + delete udpInputPortsInUse[node.port]; + } + node.status({}); }); - try { server.bind(node.port,node.iface); } - catch(e) { } // Don't worry if already bound } RED.httpAdmin.get('/udp-ports/:id', RED.auth.needsPermission('udp-ports.read'), function(req,res) { res.json(Object.keys(udpInputPortsInUse)); @@ -132,6 +136,7 @@ module.exports = function(RED) { RED.nodes.registerType("udp in",UDPin); + // The Output Node function UDPout(n) { RED.nodes.createNode(this,n); @@ -169,92 +174,99 @@ module.exports = function(RED) { } var opts = {type:node.ipv, reuseAddr:true}; - if (process.version.indexOf("v0.10") === 0) { opts = node.ipv; } var sock; var p = this.outport || this.port || "0"; - if (udpInputPortsInUse[p]) { - sock = udpInputPortsInUse[p]; - node.log(RED._("udp.status.re-use",{outport:node.outport,host:node.addr,port:node.port})); - } - else { - sock = dgram.createSocket(opts); // default to udp4 - sock.on("error", function(err) { - // Any async error will also get reported in the sock.send call. - // This handler is needed to ensure the error marked as handled to - // prevent it going to the global error handler and shutting node-red - // down. - }); - udpInputPortsInUse[p] = sock; - - if (node.multicast != "false") { - sock.bind(node.outport, function() { // have to bind before you can enable broadcast... - sock.setBroadcast(true); // turn on broadcast - if (node.multicast == "multi") { - try { - sock.setMulticastTTL(128); - sock.addMembership(node.addr,node.iface); // Add to the multicast group - node.log(RED._("udp.status.mc-ready",{iface:node.iface,outport:node.outport,host:node.addr,port:node.port})); - } catch (e) { - if (e.errno == "EINVAL") { - node.error(RED._("udp.errors.bad-mcaddress")); - } else if (e.errno == "ENODEV") { - node.error(RED._("udp.errors.interface")); - } else { - node.error(RED._("udp.errors.error",{error:e.errno})); + node.tout = setTimeout(function() { + if (udpInputPortsInUse[p]) { + sock = udpInputPortsInUse[p]; + node.log(RED._("udp.status.re-use",{outport:node.outport,host:node.addr,port:node.port})); + if (node.iface) { node.status({text:n.iface+" : "+node.iface}); } + } + else { + sock = dgram.createSocket(opts); // default to udp4 + if (node.multicast != "false") { + sock.bind(node.outport, function() { // have to bind before you can enable broadcast... + sock.setBroadcast(true); // turn on broadcast + sock.setMulticastLoopback(false); // turn off loopback + if (node.multicast == "multi") { + try { + sock.setMulticastTTL(128); + sock.addMembership(node.addr,node.iface); // Add to the multicast group + if (node.iface) { node.status({text:n.iface+" : "+node.iface}); } + node.log(RED._("udp.status.mc-ready",{iface:node.iface,outport:node.outport,host:node.addr,port:node.port})); + } catch (e) { + if (e.errno == "EINVAL") { + node.error(RED._("udp.errors.bad-mcaddress")); + } else if (e.errno == "ENODEV") { + node.error(RED._("udp.errors.interface")); + } else { + node.error(RED._("udp.errors.error",{error:e.errno})); + } } + } else { + node.log(RED._("udp.status.bc-ready",{outport:node.outport,host:node.addr,port:node.port})); } - } else { - node.log(RED._("udp.status.bc-ready",{outport:node.outport,host:node.addr,port:node.port})); - } + }); + } else if ((node.outport !== "") && (!udpInputPortsInUse[node.outport])) { + sock.bind(node.outport); + node.log(RED._("udp.status.ready",{outport:node.outport,host:node.addr,port:node.port})); + } else { + node.log(RED._("udp.status.ready-nolocal",{host:node.addr,port:node.port})); + } + sock.on("error", function(err) { + // Any async error will also get reported in the sock.send call. + // This handler is needed to ensure the error marked as handled to + // prevent it going to the global error handler and shutting node-red + // down. }); - } else if ((node.outport !== "") && (!udpInputPortsInUse[node.outport])) { - sock.bind(node.outport); - node.log(RED._("udp.status.ready",{outport:node.outport,host:node.addr,port:node.port})); - } else { - node.log(RED._("udp.status.ready-nolocal",{host:node.addr,port:node.port})); + udpInputPortsInUse[p] = sock; } } - node.on("input", function(msg) { - if (msg.hasOwnProperty("payload")) { - var add = node.addr || msg.ip || ""; - var por = node.port || msg.port || 0; - if (add === "") { - node.warn(RED._("udp.errors.ip-notset")); - } else if (por === 0) { - node.warn(RED._("udp.errors.port-notset")); - } else if (isNaN(por) || (por < 1) || (por > 65535)) { - node.warn(RED._("udp.errors.port-invalid")); - } else { - var message; - if (node.base64) { - message = Buffer.from(msg.payload, 'base64'); - } else if (msg.payload instanceof Buffer) { - message = msg.payload; + node.on("input", function(msg) { + if (msg.hasOwnProperty("payload")) { + var add = node.addr || msg.ip || ""; + var por = node.port || msg.port || 0; + if (add === "") { + node.warn(RED._("udp.errors.ip-notset")); + } else if (por === 0) { + node.warn(RED._("udp.errors.port-notset")); + } else if (isNaN(por) || (por < 1) || (por > 65535)) { + node.warn(RED._("udp.errors.port-invalid")); } else { - message = Buffer.from(""+msg.payload); - } - sock.send(message, 0, message.length, por, add, function(err, bytes) { - if (err) { - node.error("udp : "+err,msg); + var message; + if (node.base64) { + message = Buffer.from(msg.payload, 'base64'); + } else if (msg.payload instanceof Buffer) { + message = msg.payload; + } else { + message = Buffer.from(""+msg.payload); } - message = null; - }); + sock.send(message, 0, message.length, por, add, function(err, bytes) { + if (err) { + node.error("udp : "+err,msg); + } + message = null; + }); + } } - } - }); + }); + }, 150); node.on("close", function() { - if (udpInputPortsInUse.hasOwnProperty(p)) { - delete udpInputPortsInUse[p]; - } + if (node.tout) { clearTimeout(node.tout); } try { + if (node.multicast == "multi") { sock.dropMembership(node.group); } sock.close(); node.log(RED._("udp.status.output-stopped")); } catch (err) { //node.error(err); } + if (udpInputPortsInUse.hasOwnProperty(p)) { + delete udpInputPortsInUse[p]; + } + node.status({}); }); } RED.nodes.registerType("udp out",UDPout); From c2c6e6080e7d28cf4e9aacd1bd05b866f67d30e9 Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Tue, 23 Oct 2018 23:27:49 +0100 Subject: [PATCH 2/2] remove extraneous brace from udp node --- packages/node_modules/@node-red/nodes/core/io/32-udp.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/node_modules/@node-red/nodes/core/io/32-udp.js b/packages/node_modules/@node-red/nodes/core/io/32-udp.js index d23ac6950..19001eff4 100644 --- a/packages/node_modules/@node-red/nodes/core/io/32-udp.js +++ b/packages/node_modules/@node-red/nodes/core/io/32-udp.js @@ -222,7 +222,6 @@ module.exports = function(RED) { }); udpInputPortsInUse[p] = sock; } - } node.on("input", function(msg) { if (msg.hasOwnProperty("payload")) {