Migrate to new node function style

This commit is contained in:
Nick O'Leary
2014-05-03 23:32:04 +01:00
parent 5afc5857c4
commit ff49d2b217
37 changed files with 3194 additions and 3170 deletions

View File

@@ -14,122 +14,124 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var connectionPool = require("./lib/mqttConnectionPool");
var util = require("util");
function MQTTBrokerNode(n) {
RED.nodes.createNode(this,n);
this.broker = n.broker;
this.port = n.port;
this.clientid = n.clientid;
var credentials = RED.nodes.getCredentials(n.id);
if (credentials) {
this.username = credentials.user;
this.password = credentials.password;
}
}
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode);
var querystring = require('querystring');
RED.httpAdmin.get('/mqtt-broker/:id',function(req,res) {
var credentials = RED.nodes.getCredentials(req.params.id);
if (credentials) {
res.send(JSON.stringify({user:credentials.user,hasPassword:(credentials.password&&credentials.password!="")}));
} else {
res.send(JSON.stringify({}));
module.exports = function(RED) {
var connectionPool = require("./lib/mqttConnectionPool");
var util = require("util");
function MQTTBrokerNode(n) {
RED.nodes.createNode(this,n);
this.broker = n.broker;
this.port = n.port;
this.clientid = n.clientid;
var credentials = RED.nodes.getCredentials(n.id);
if (credentials) {
this.username = credentials.user;
this.password = credentials.password;
}
}
});
RED.httpAdmin.delete('/mqtt-broker/:id',function(req,res) {
RED.nodes.deleteCredentials(req.params.id);
res.send(200);
});
RED.httpAdmin.post('/mqtt-broker/:id',function(req,res) {
var body = "";
req.on('data', function(chunk) {
body+=chunk;
RED.nodes.registerType("mqtt-broker",MQTTBrokerNode);
var querystring = require('querystring');
RED.httpAdmin.get('/mqtt-broker/:id',function(req,res) {
var credentials = RED.nodes.getCredentials(req.params.id);
if (credentials) {
res.send(JSON.stringify({user:credentials.user,hasPassword:(credentials.password&&credentials.password!="")}));
} else {
res.send(JSON.stringify({}));
}
});
req.on('end', function(){
var newCreds = querystring.parse(body);
var credentials = RED.nodes.getCredentials(req.params.id)||{};
if (newCreds.user == null || newCreds.user == "") {
delete credentials.user;
} else {
credentials.user = newCreds.user;
}
if (newCreds.password == "") {
delete credentials.password;
} else {
credentials.password = newCreds.password||credentials.password;
}
RED.nodes.addCredentials(req.params.id,credentials);
RED.httpAdmin.delete('/mqtt-broker/:id',function(req,res) {
RED.nodes.deleteCredentials(req.params.id);
res.send(200);
});
});
function MQTTInNode(n) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
var node = this;
this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) {
var msg = {topic:topic,payload:payload,qos:qos,retain:retain};
if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) {
msg._topic = topic;
}
node.send(msg);
RED.httpAdmin.post('/mqtt-broker/:id',function(req,res) {
var body = "";
req.on('data', function(chunk) {
body+=chunk;
});
this.client.connect();
} else {
this.error("missing broker configuration");
}
}
RED.nodes.registerType("mqtt in",MQTTInNode);
MQTTInNode.prototype.close = function() {
if (this.client) {
this.client.disconnect();
}
}
function MQTTOutNode(n) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
this.on("input",function(msg) {
if (msg != null) {
if (this.topic) {
msg.topic = this.topic;
}
this.client.publish(msg);
req.on('end', function(){
var newCreds = querystring.parse(body);
var credentials = RED.nodes.getCredentials(req.params.id)||{};
if (newCreds.user == null || newCreds.user == "") {
delete credentials.user;
} else {
credentials.user = newCreds.user;
}
if (newCreds.password == "") {
delete credentials.password;
} else {
credentials.password = newCreds.password||credentials.password;
}
RED.nodes.addCredentials(req.params.id,credentials);
res.send(200);
});
this.client.connect();
} else {
this.error("missing broker configuration");
}
}
RED.nodes.registerType("mqtt out",MQTTOutNode);
MQTTOutNode.prototype.close = function() {
if (this.client) {
this.client.disconnect();
});
function MQTTInNode(n) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
var node = this;
this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) {
var msg = {topic:topic,payload:payload,qos:qos,retain:retain};
if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) {
msg._topic = topic;
}
node.send(msg);
});
this.client.connect();
} else {
this.error("missing broker configuration");
}
}
RED.nodes.registerType("mqtt in",MQTTInNode);
MQTTInNode.prototype.close = function() {
if (this.client) {
this.client.disconnect();
}
}
function MQTTOutNode(n) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.broker = n.broker;
this.brokerConfig = RED.nodes.getNode(this.broker);
if (this.brokerConfig) {
this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
this.on("input",function(msg) {
if (msg != null) {
if (this.topic) {
msg.topic = this.topic;
}
this.client.publish(msg);
}
});
this.client.connect();
} else {
this.error("missing broker configuration");
}
}
RED.nodes.registerType("mqtt out",MQTTOutNode);
MQTTOutNode.prototype.close = function() {
if (this.client) {
this.client.disconnect();
}
}
}

View File

@@ -14,232 +14,234 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var util = require("util");
var http = require("follow-redirects").http;
var https = require("follow-redirects").https;
var urllib = require("url");
var express = require("express");
var getBody = require('raw-body');
var mustache = require("mustache");
var querystring = require("querystring");
var cors = require('cors');
var jsonParser = express.json();
var urlencParser = express.urlencoded();
function rawBodyParser(req, res, next) {
if (req._body) return next();
req.body = "";
req._body = true;
getBody(req, {
limit: '1mb',
length: req.headers['content-length'],
encoding: 'utf8'
}, function (err, buf) {
if (err) return next(err);
req.body = buf;
next();
});
}
function HTTPIn(n) {
RED.nodes.createNode(this,n);
if (RED.settings.httpNodeRoot !== false) {
this.url = n.url;
this.method = n.method;
var node = this;
this.errorHandler = function(err,req,res,next) {
node.warn(err);
res.send(500);
};
this.callback = function(req,res) {
if (node.method == "post") {
node.send({req:req,res:res,payload:req.body});
} else if (node.method == "get") {
node.send({req:req,res:res,payload:req.query});
} else {
node.send({req:req,res:res});
}
}
var corsHandler = function(req,res,next) { next(); }
if (RED.settings.httpNodeCors) {
corsHandler = cors(RED.settings.httpNodeCors);
RED.httpNode.options(this.url,corsHandler);
}
if (this.method == "get") {
RED.httpNode.get(this.url,corsHandler,this.callback,this.errorHandler);
} else if (this.method == "post") {
RED.httpNode.post(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
} else if (this.method == "put") {
RED.httpNode.put(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
} else if (this.method == "delete") {
RED.httpNode.delete(this.url,corsHandler,this.callback,errorHandler);
}
this.on("close",function() {
var routes = RED.httpNode.routes[this.method];
for (var i = 0; i<routes.length; i++) {
if (routes[i].path == this.url) {
routes.splice(i,1);
//break;
module.exports = function(RED) {
var util = require("util");
var http = require("follow-redirects").http;
var https = require("follow-redirects").https;
var urllib = require("url");
var express = require("express");
var getBody = require('raw-body');
var mustache = require("mustache");
var querystring = require("querystring");
var cors = require('cors');
var jsonParser = express.json();
var urlencParser = express.urlencoded();
function rawBodyParser(req, res, next) {
if (req._body) return next();
req.body = "";
req._body = true;
getBody(req, {
limit: '1mb',
length: req.headers['content-length'],
encoding: 'utf8'
}, function (err, buf) {
if (err) return next(err);
req.body = buf;
next();
});
}
function HTTPIn(n) {
RED.nodes.createNode(this,n);
if (RED.settings.httpNodeRoot !== false) {
this.url = n.url;
this.method = n.method;
var node = this;
this.errorHandler = function(err,req,res,next) {
node.warn(err);
res.send(500);
};
this.callback = function(req,res) {
if (node.method == "post") {
node.send({req:req,res:res,payload:req.body});
} else if (node.method == "get") {
node.send({req:req,res:res,payload:req.query});
} else {
node.send({req:req,res:res});
}
}
var corsHandler = function(req,res,next) { next(); }
if (RED.settings.httpNodeCors) {
var routes = RED.httpNode.routes['options'];
corsHandler = cors(RED.settings.httpNodeCors);
RED.httpNode.options(this.url,corsHandler);
}
if (this.method == "get") {
RED.httpNode.get(this.url,corsHandler,this.callback,this.errorHandler);
} else if (this.method == "post") {
RED.httpNode.post(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
} else if (this.method == "put") {
RED.httpNode.put(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
} else if (this.method == "delete") {
RED.httpNode.delete(this.url,corsHandler,this.callback,errorHandler);
}
this.on("close",function() {
var routes = RED.httpNode.routes[this.method];
for (var i = 0; i<routes.length; i++) {
if (routes[i].path == this.url) {
routes.splice(i,1);
//break;
}
}
}
});
} else {
this.warn("Cannot create http-in node when httpNodeRoot set to false");
}
}
RED.nodes.registerType("http in",HTTPIn);
function HTTPOut(n) {
RED.nodes.createNode(this,n);
var node = this;
this.on("input",function(msg) {
if (msg.res) {
if (msg.headers) {
msg.res.set(msg.headers);
}
var statusCode = msg.statusCode || 200;
if (typeof msg.payload == "object" && !Buffer.isBuffer(msg.payload)) {
msg.res.jsonp(statusCode,msg.payload);
} else {
msg.res.send(statusCode,msg.payload);
}
} else {
node.warn("No response object");
}
});
}
RED.nodes.registerType("http response",HTTPOut);
function HTTPRequest(n) {
RED.nodes.createNode(this,n);
var nodeUrl = n.url;
var isTemplatedUrl = (nodeUrl||"").indexOf("{{") != -1;
var nodeMethod = n.method || "GET";
var node = this;
var credentials = RED.nodes.getCredentials(n.id);
this.on("input",function(msg) {
var url;
if (msg.url) {
url = msg.url;
} else if (isTemplatedUrl) {
url = mustache.render(nodeUrl,msg);
} else {
url = nodeUrl;
}
var method = (msg.method||nodeMethod).toUpperCase();
var opts = urllib.parse(url);
opts.method = method;
opts.headers = {};
if (msg.headers) {
for (var v in msg.headers) {
opts.headers[v.toLowerCase()] = msg.headers[v];
}
}
if (credentials) {
opts.auth = credentials.user+":"+(credentials.password||"");
}
var payload = null;
if (msg.payload && (method == "POST" || method == "PUT") ) {
if (typeof msg.payload === "string" || Buffer.isBuffer(msg.payload)) {
payload = msg.payload;
} else if (typeof msg.payload == "number") {
payload = msg.payload+"";
} else {
if (opts.headers['content-type'] == 'application/x-www-form-urlencoded') {
payload = querystring.stringify(msg.payload);
} else {
payload = JSON.stringify(msg.payload);
if (opts.headers['content-type'] == null) {
opts.headers['content-type'] = "application/json";
if (RED.settings.httpNodeCors) {
var routes = RED.httpNode.routes['options'];
for (var i = 0; i<routes.length; i++) {
if (routes[i].path == this.url) {
routes.splice(i,1);
//break;
}
}
}
if (opts.headers['content-length'] == null) {
opts.headers['content-length'] = Buffer.byteLength(payload);
});
} else {
this.warn("Cannot create http-in node when httpNodeRoot set to false");
}
}
RED.nodes.registerType("http in",HTTPIn);
function HTTPOut(n) {
RED.nodes.createNode(this,n);
var node = this;
this.on("input",function(msg) {
if (msg.res) {
if (msg.headers) {
msg.res.set(msg.headers);
}
var statusCode = msg.statusCode || 200;
if (typeof msg.payload == "object" && !Buffer.isBuffer(msg.payload)) {
msg.res.jsonp(statusCode,msg.payload);
} else {
msg.res.send(statusCode,msg.payload);
}
} else {
node.warn("No response object");
}
var req = ((/^https/.test(url))?https:http).request(opts,function(res) {
res.setEncoding('utf8');
msg.statusCode = res.statusCode;
msg.headers = res.headers;
msg.payload = "";
res.on('data',function(chunk) {
msg.payload += chunk;
});
}
RED.nodes.registerType("http response",HTTPOut);
function HTTPRequest(n) {
RED.nodes.createNode(this,n);
var nodeUrl = n.url;
var isTemplatedUrl = (nodeUrl||"").indexOf("{{") != -1;
var nodeMethod = n.method || "GET";
var node = this;
var credentials = RED.nodes.getCredentials(n.id);
this.on("input",function(msg) {
var url;
if (msg.url) {
url = msg.url;
} else if (isTemplatedUrl) {
url = mustache.render(nodeUrl,msg);
} else {
url = nodeUrl;
}
var method = (msg.method||nodeMethod).toUpperCase();
var opts = urllib.parse(url);
opts.method = method;
opts.headers = {};
if (msg.headers) {
for (var v in msg.headers) {
opts.headers[v.toLowerCase()] = msg.headers[v];
}
}
if (credentials) {
opts.auth = credentials.user+":"+(credentials.password||"");
}
var payload = null;
if (msg.payload && (method == "POST" || method == "PUT") ) {
if (typeof msg.payload === "string" || Buffer.isBuffer(msg.payload)) {
payload = msg.payload;
} else if (typeof msg.payload == "number") {
payload = msg.payload+"";
} else {
if (opts.headers['content-type'] == 'application/x-www-form-urlencoded') {
payload = querystring.stringify(msg.payload);
} else {
payload = JSON.stringify(msg.payload);
if (opts.headers['content-type'] == null) {
opts.headers['content-type'] = "application/json";
}
}
}
if (opts.headers['content-length'] == null) {
opts.headers['content-length'] = Buffer.byteLength(payload);
}
}
var req = ((/^https/.test(url))?https:http).request(opts,function(res) {
res.setEncoding('utf8');
msg.statusCode = res.statusCode;
msg.headers = res.headers;
msg.payload = "";
res.on('data',function(chunk) {
msg.payload += chunk;
});
res.on('end',function() {
node.send(msg);
});
});
res.on('end',function() {
req.on('error',function(err) {
msg.payload = err.toString();
msg.statusCode = err.code;
node.send(msg);
});
});
req.on('error',function(err) {
msg.payload = err.toString();
msg.statusCode = err.code;
node.send(msg);
});
if (payload) {
req.write(payload);
}
req.end();
});
}
RED.nodes.registerType("http request",HTTPRequest);
RED.httpAdmin.get('/http-request/:id',function(req,res) {
var credentials = RED.nodes.getCredentials(req.params.id);
if (credentials) {
res.send(JSON.stringify({user:credentials.user,hasPassword:(credentials.password&&credentials.password!="")}));
} else {
res.send(JSON.stringify({}));
if (payload) {
req.write(payload);
}
req.end();
});
}
});
RED.httpAdmin.delete('/http-request/:id',function(req,res) {
RED.nodes.deleteCredentials(req.params.id);
res.send(200);
});
RED.httpAdmin.post('/http-request/:id',function(req,res) {
var body = "";
req.on('data', function(chunk) {
body+=chunk;
RED.nodes.registerType("http request",HTTPRequest);
RED.httpAdmin.get('/http-request/:id',function(req,res) {
var credentials = RED.nodes.getCredentials(req.params.id);
if (credentials) {
res.send(JSON.stringify({user:credentials.user,hasPassword:(credentials.password&&credentials.password!="")}));
} else {
res.send(JSON.stringify({}));
}
});
req.on('end', function(){
var newCreds = querystring.parse(body);
var credentials = RED.nodes.getCredentials(req.params.id)||{};
if (newCreds.user == null || newCreds.user == "") {
delete credentials.user;
} else {
credentials.user = newCreds.user;
}
if (newCreds.password == "") {
delete credentials.password;
} else {
credentials.password = newCreds.password||credentials.password;
}
RED.nodes.addCredentials(req.params.id,credentials);
RED.httpAdmin.delete('/http-request/:id',function(req,res) {
RED.nodes.deleteCredentials(req.params.id);
res.send(200);
});
});
RED.httpAdmin.post('/http-request/:id',function(req,res) {
var body = "";
req.on('data', function(chunk) {
body+=chunk;
});
req.on('end', function(){
var newCreds = querystring.parse(body);
var credentials = RED.nodes.getCredentials(req.params.id)||{};
if (newCreds.user == null || newCreds.user == "") {
delete credentials.user;
} else {
credentials.user = newCreds.user;
}
if (newCreds.password == "") {
delete credentials.password;
} else {
credentials.password = newCreds.password||credentials.password;
}
RED.nodes.addCredentials(req.params.id,credentials);
res.send(200);
});
});
}

View File

@@ -14,159 +14,160 @@
* limitations under the License.
**/
// Require main module
var RED = require(process.env.NODE_RED_HOME+"/red/red"),
ws = require("ws"),
inspect = require("sys").inspect;
// A node red node that sets up a local websocket server
function WebSocketListenerNode(n) {
// Create a RED node
RED.nodes.createNode(this,n);
var node = this;
// Store local copies of the node configuration (as defined in the .html)
node.path = n.path;
node.wholemsg = (n.wholemsg === "true");
module.exports = function(RED) {
node._inputNodes = []; // collection of nodes that want to receive events
var path = RED.settings.httpNodeRoot || "/";
path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);
// Workaround https://github.com/einaros/ws/pull/253
// Listen for 'newListener' events from RED.server
node._serverListeners = {};
var storeListener = function(/*String*/event,/*function*/listener){
if(event == "error" || event == "upgrade" || event == "listening"){
node._serverListeners[event] = listener;
}
}
node._clients = {};
var ws = require("ws"),
inspect = require("sys").inspect;
RED.server.addListener('newListener',storeListener);
// Create a WebSocket Server
node.server = new ws.Server({server:RED.server,path:path});
// Workaround https://github.com/einaros/ws/pull/253
// Stop listening for new listener events
RED.server.removeListener('newListener',storeListener);
node.server.on('connection', function(socket){
var id = (1+Math.random()*4294967295).toString(16);
node._clients[id] = socket;
// A node red node that sets up a local websocket server
function WebSocketListenerNode(n) {
// Create a RED node
RED.nodes.createNode(this,n);
var node = this;
// Store local copies of the node configuration (as defined in the .html)
node.path = n.path;
node.wholemsg = (n.wholemsg === "true");
socket.on('close',function() {
delete node._clients[id];
});
socket.on('message',function(data,flags){
node.handleEvent(id,socket,'message',data,flags);
});
socket.on('error', function(err) {
node.warn("An error occured on the ws connection: "+inspect(err));
});
});
node.on("close", function() {
node._inputNodes = []; // collection of nodes that want to receive events
var path = RED.settings.httpNodeRoot || "/";
path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);
// Workaround https://github.com/einaros/ws/pull/253
// Remove listeners from RED.server
var listener = null;
for(var event in node._serverListeners){
listener = node._serverListeners[event];
if(typeof listener === "function"){
RED.server.removeListener(event,listener);
// Listen for 'newListener' events from RED.server
node._serverListeners = {};
var storeListener = function(/*String*/event,/*function*/listener){
if(event == "error" || event == "upgrade" || event == "listening"){
node._serverListeners[event] = listener;
}
}
node._serverListeners = {};
node.server.close();
node._inputNodes = [];
});
}
RED.nodes.registerType("websocket-listener",WebSocketListenerNode);
WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){
this._inputNodes.push(handler);
}
WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
var msg;
if (this.wholemsg) {
msg = JSON.parse(data);
} else {
msg = {
payload:data
node._clients = {};
RED.server.addListener('newListener',storeListener);
// Create a WebSocket Server
node.server = new ws.Server({server:RED.server,path:path});
// Workaround https://github.com/einaros/ws/pull/253
// Stop listening for new listener events
RED.server.removeListener('newListener',storeListener);
node.server.on('connection', function(socket){
var id = (1+Math.random()*4294967295).toString(16);
node._clients[id] = socket;
socket.on('close',function() {
delete node._clients[id];
});
socket.on('message',function(data,flags){
node.handleEvent(id,socket,'message',data,flags);
});
socket.on('error', function(err) {
node.warn("An error occured on the ws connection: "+inspect(err));
});
});
node.on("close", function() {
// Workaround https://github.com/einaros/ws/pull/253
// Remove listeners from RED.server
var listener = null;
for(var event in node._serverListeners){
listener = node._serverListeners[event];
if(typeof listener === "function"){
RED.server.removeListener(event,listener);
}
}
node._serverListeners = {};
node.server.close();
node._inputNodes = [];
});
}
RED.nodes.registerType("websocket-listener",WebSocketListenerNode);
WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){
this._inputNodes.push(handler);
}
WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
var msg;
if (this.wholemsg) {
msg = JSON.parse(data);
} else {
msg = {
payload:data
};
}
msg._session = {type:"websocket",id:id};
for (var i = 0; i < this._inputNodes.length; i++) {
this._inputNodes[i].send(msg);
};
}
msg._session = {type:"websocket",id:id};
for (var i = 0; i < this._inputNodes.length; i++) {
this._inputNodes[i].send(msg);
};
}
WebSocketListenerNode.prototype.broadcast = function(data){
for(var i in this.server.clients){
this.server.clients[i].send(data);
};
}
WebSocketListenerNode.prototype.send = function(id,data){
var session = this._clients[id];
if (session) {
session.send(data);
WebSocketListenerNode.prototype.broadcast = function(data){
for(var i in this.server.clients){
this.server.clients[i].send(data);
};
}
}
function WebSocketInNode(n) {
RED.nodes.createNode(this,n);
this.server = n.server;
var node = this;
this.serverConfig = RED.nodes.getNode(this.server);
if (this.serverConfig) {
this.serverConfig.registerInputNode(this);
} else {
this.error("Missing server configuration");
}
}
RED.nodes.registerType("websocket in",WebSocketInNode);
function WebSocketOutNode(n) {
RED.nodes.createNode(this,n);
var node = this;
this.server = n.server;
this.serverConfig = RED.nodes.getNode(this.server);
if (!this.serverConfig) {
this.error("Missing server configuration");
}
this.on("input", function(msg) {
var payload;
if (this.serverConfig.wholemsg) {
delete msg._session;
payload = JSON.stringify(msg);
} else {
payload = msg.payload;
if (Buffer.isBuffer(payload)) {
payload = payload.toString();
} else if (typeof payload === "object") {
payload = JSON.stringify(payload);
} else if (typeof payload !== "string") {
payload = ""+payload;
}
WebSocketListenerNode.prototype.send = function(id,data){
var session = this._clients[id];
if (session) {
session.send(data);
}
if (msg._session && msg._session.type == "websocket") {
node.serverConfig.send(msg._session.id,payload);
}
function WebSocketInNode(n) {
RED.nodes.createNode(this,n);
this.server = n.server;
var node = this;
this.serverConfig = RED.nodes.getNode(this.server);
if (this.serverConfig) {
this.serverConfig.registerInputNode(this);
} else {
node.serverConfig.broadcast(payload,function(error){
if(!!error){
node.warn("An error occurred while sending:" + inspect(error));
this.error("Missing server configuration");
}
}
RED.nodes.registerType("websocket in",WebSocketInNode);
function WebSocketOutNode(n) {
RED.nodes.createNode(this,n);
var node = this;
this.server = n.server;
this.serverConfig = RED.nodes.getNode(this.server);
if (!this.serverConfig) {
this.error("Missing server configuration");
}
this.on("input", function(msg) {
var payload;
if (this.serverConfig.wholemsg) {
delete msg._session;
payload = JSON.stringify(msg);
} else {
payload = msg.payload;
if (Buffer.isBuffer(payload)) {
payload = payload.toString();
} else if (typeof payload === "object") {
payload = JSON.stringify(payload);
} else if (typeof payload !== "string") {
payload = ""+payload;
}
});
}
});
}
if (msg._session && msg._session.type == "websocket") {
node.serverConfig.send(msg._session.id,payload);
} else {
node.serverConfig.broadcast(payload,function(error){
if(!!error){
node.warn("An error occurred while sending:" + inspect(error));
}
});
}
});
}
RED.nodes.registerType("websocket out",WebSocketOutNode);
}
RED.nodes.registerType("websocket out",WebSocketOutNode);

View File

@@ -14,29 +14,30 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var notify = require("fs.notify");
var fs = require("fs");
var sep = require("path").sep;
function WatchNode(n) {
RED.nodes.createNode(this,n);
this.files = n.files.split(",");
for (var f in this.files) {
this.files[f] = this.files[f].trim();
}
this.p = (this.files.length == 1) ? this.files[0] : JSON.stringify(this.files);
var node = this;
var notifications = new notify(node.files);
notifications.on('change', function (file, event, path) {
if (fs.statSync(path).isDirectory()) { path = path + sep + file; }
var msg = { payload: path, topic: node.p, file: file};
node.send(msg);
});
this.close = function() {
notifications.close();
module.exports = function(RED) {
var notify = require("fs.notify");
var fs = require("fs");
var sep = require("path").sep;
function WatchNode(n) {
RED.nodes.createNode(this,n);
this.files = n.files.split(",");
for (var f in this.files) {
this.files[f] = this.files[f].trim();
}
this.p = (this.files.length == 1) ? this.files[0] : JSON.stringify(this.files);
var node = this;
var notifications = new notify(node.files);
notifications.on('change', function (file, event, path) {
if (fs.statSync(path).isDirectory()) { path = path + sep + file; }
var msg = { payload: path, topic: node.p, file: file};
node.send(msg);
});
this.close = function() {
notifications.close();
}
}
RED.nodes.registerType("watch",WatchNode);
}
RED.nodes.registerType("watch",WatchNode);

View File

@@ -14,199 +14,200 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var settings = RED.settings;
var events = require("events");
var util = require("util");
var serialp = require("serialport");
// TODO: 'serialPool' should be encapsulated in SerialPortNode
function SerialPortNode(n) {
RED.nodes.createNode(this,n);
this.serialport = n.serialport;
this.newline = n.newline;
this.addchar = n.addchar || "false";
this.serialbaud = parseInt(n.serialbaud) || 57600;
this.databits = parseInt(n.databits) || 8;
this.parity = n.parity || "none";
this.stopbits = parseInt(n.stopbits) || 1;
}
RED.nodes.registerType("serial-port",SerialPortNode);
function SerialOutNode(n) {
RED.nodes.createNode(this,n);
this.serial = n.serial;
this.serialConfig = RED.nodes.getNode(this.serial);
if (this.serialConfig) {
var node = this;
node.port = serialPool.get(this.serialConfig.serialport,
this.serialConfig.serialbaud,
this.serialConfig.databits,
this.serialConfig.parity,
this.serialConfig.stopbits,
this.serialConfig.newline);
node.addCh = "";
if (node.serialConfig.addchar == "true") {
node.addCh = this.serialConfig.newline.replace("\\n","\n").replace("\\r","\r");
}
node.on("input",function(msg) {
var payload = msg.payload;
if (!Buffer.isBuffer(payload)) {
if (typeof payload === "object") {
payload = JSON.stringify(payload);
} else {
payload = new String(payload);
}
payload += node.addCh;
} else if (node.addCh !== "") {
payload = Buffer.concat([payload,new Buffer(node.addCh)]);
}
node.port.write(payload,function(err,res) {
if (err) {
node.error(err);
}
});
});
} else {
this.error("missing serial config");
module.exports = function(RED) {
var settings = RED.settings;
var events = require("events");
var util = require("util");
var serialp = require("serialport");
// TODO: 'serialPool' should be encapsulated in SerialPortNode
function SerialPortNode(n) {
RED.nodes.createNode(this,n);
this.serialport = n.serialport;
this.newline = n.newline;
this.addchar = n.addchar || "false";
this.serialbaud = parseInt(n.serialbaud) || 57600;
this.databits = parseInt(n.databits) || 8;
this.parity = n.parity || "none";
this.stopbits = parseInt(n.stopbits) || 1;
}
this.on("close", function() {
RED.nodes.registerType("serial-port",SerialPortNode);
function SerialOutNode(n) {
RED.nodes.createNode(this,n);
this.serial = n.serial;
this.serialConfig = RED.nodes.getNode(this.serial);
if (this.serialConfig) {
serialPool.close(this.serialConfig.serialport);
}
});
}
RED.nodes.registerType("serial out",SerialOutNode);
function SerialInNode(n) {
RED.nodes.createNode(this,n);
this.serial = n.serial;
this.serialConfig = RED.nodes.getNode(this.serial);
if (this.serialConfig) {
var node = this;
node.port = serialPool.get(this.serialConfig.serialport,
this.serialConfig.serialbaud,
this.serialConfig.databits,
this.serialConfig.parity,
this.serialConfig.stopbits,
this.serialConfig.newline);
this.port.on('data', function(msg) {
node.send({ "payload": msg });
});
} else {
this.error("missing serial config");
}
this.on("close", function() {
if (this.serialConfig) {
try {
serialPool.close(this.serialConfig.serialport);
} catch(err) {
var node = this;
node.port = serialPool.get(this.serialConfig.serialport,
this.serialConfig.serialbaud,
this.serialConfig.databits,
this.serialConfig.parity,
this.serialConfig.stopbits,
this.serialConfig.newline);
node.addCh = "";
if (node.serialConfig.addchar == "true") {
node.addCh = this.serialConfig.newline.replace("\\n","\n").replace("\\r","\r");
}
}
});
}
RED.nodes.registerType("serial in",SerialInNode);
var serialPool = function() {
var connections = {};
return {
get:function(port,baud,databits,parity,stopbits,newline,callback) {
var id = port;
if (!connections[id]) {
connections[id] = function() {
var obj = {
_emitter: new events.EventEmitter(),
serial: null,
_closing: false,
tout: null,
on: function(a,b) { this._emitter.on(a,b); },
close: function(cb) { this.serial.close(cb); },
write: function(m,cb) { this.serial.write(m,cb); },
node.on("input",function(msg) {
var payload = msg.payload;
if (!Buffer.isBuffer(payload)) {
if (typeof payload === "object") {
payload = JSON.stringify(payload);
} else {
payload = new String(payload);
}
newline = newline.replace("\\n","\n").replace("\\r","\r");
var setupSerial = function() {
if (newline == "") {
obj.serial = new serialp.SerialPort(port,{
baudrate: baud,
databits: databits,
parity: parity,
stopbits: stopbits,
parser: serialp.parsers.raw
},true, function(err, results) { if (err) obj.serial.emit('error',err); });
payload += node.addCh;
} else if (node.addCh !== "") {
payload = Buffer.concat([payload,new Buffer(node.addCh)]);
}
node.port.write(payload,function(err,res) {
if (err) {
node.error(err);
}
});
});
} else {
this.error("missing serial config");
}
this.on("close", function() {
if (this.serialConfig) {
serialPool.close(this.serialConfig.serialport);
}
});
}
RED.nodes.registerType("serial out",SerialOutNode);
function SerialInNode(n) {
RED.nodes.createNode(this,n);
this.serial = n.serial;
this.serialConfig = RED.nodes.getNode(this.serial);
if (this.serialConfig) {
var node = this;
node.port = serialPool.get(this.serialConfig.serialport,
this.serialConfig.serialbaud,
this.serialConfig.databits,
this.serialConfig.parity,
this.serialConfig.stopbits,
this.serialConfig.newline);
this.port.on('data', function(msg) {
node.send({ "payload": msg });
});
} else {
this.error("missing serial config");
}
this.on("close", function() {
if (this.serialConfig) {
try {
serialPool.close(this.serialConfig.serialport);
} catch(err) {
}
}
});
}
RED.nodes.registerType("serial in",SerialInNode);
var serialPool = function() {
var connections = {};
return {
get:function(port,baud,databits,parity,stopbits,newline,callback) {
var id = port;
if (!connections[id]) {
connections[id] = function() {
var obj = {
_emitter: new events.EventEmitter(),
serial: null,
_closing: false,
tout: null,
on: function(a,b) { this._emitter.on(a,b); },
close: function(cb) { this.serial.close(cb); },
write: function(m,cb) { this.serial.write(m,cb); },
}
else {
obj.serial = new serialp.SerialPort(port,{
baudrate: baud,
databits: databits,
parity: parity,
stopbits: stopbits,
parser: serialp.parsers.readline(newline)
},true, function(err, results) { if (err) obj.serial.emit('error',err); });
}
obj.serial.on('error', function(err) {
util.log("[serial] serial port "+port+" error "+err);
obj.tout = setTimeout(function() {
setupSerial();
}, settings.serialReconnectTime);
});
obj.serial.on('close', function() {
if (!obj._closing) {
util.log("[serial] serial port "+port+" closed unexpectedly");
newline = newline.replace("\\n","\n").replace("\\r","\r");
var setupSerial = function() {
if (newline == "") {
obj.serial = new serialp.SerialPort(port,{
baudrate: baud,
databits: databits,
parity: parity,
stopbits: stopbits,
parser: serialp.parsers.raw
},true, function(err, results) { if (err) obj.serial.emit('error',err); });
}
else {
obj.serial = new serialp.SerialPort(port,{
baudrate: baud,
databits: databits,
parity: parity,
stopbits: stopbits,
parser: serialp.parsers.readline(newline)
},true, function(err, results) { if (err) obj.serial.emit('error',err); });
}
obj.serial.on('error', function(err) {
util.log("[serial] serial port "+port+" error "+err);
obj.tout = setTimeout(function() {
setupSerial();
}, settings.serialReconnectTime);
}
});
obj.serial.on('open',function() {
util.log("[serial] serial port "+port+" opened at "+baud+" baud "+databits+""+parity.charAt(0).toUpperCase()+stopbits);
if (obj.tout) { clearTimeout(obj.tout); }
//obj.serial.flush();
obj._emitter.emit('ready');
});
obj.serial.on('data',function(d) {
if (typeof d !== "string") {
d = d.toString();
for (i=0; i<d.length; i++) {
obj._emitter.emit('data',d.charAt(i));
});
obj.serial.on('close', function() {
if (!obj._closing) {
util.log("[serial] serial port "+port+" closed unexpectedly");
obj.tout = setTimeout(function() {
setupSerial();
}, settings.serialReconnectTime);
}
}
else {
obj._emitter.emit('data',d);
}
});
obj.serial.on('open',function() {
util.log("[serial] serial port "+port+" opened at "+baud+" baud "+databits+""+parity.charAt(0).toUpperCase()+stopbits);
if (obj.tout) { clearTimeout(obj.tout); }
//obj.serial.flush();
obj._emitter.emit('ready');
});
obj.serial.on('data',function(d) {
if (typeof d !== "string") {
d = d.toString();
for (i=0; i<d.length; i++) {
obj._emitter.emit('data',d.charAt(i));
}
}
else {
obj._emitter.emit('data',d);
}
});
}
setupSerial();
return obj;
}();
}
return connections[id];
},
close: function(port) {
if (connections[port]) {
if (connections[port].tout != null) clearTimeout(connections[port].tout);
connections[port]._closing = true;
try {
connections[port].close(function() {
util.log("[serial] serial port closed");
});
}
setupSerial();
return obj;
}();
} catch(err) { };
}
delete connections[port];
}
return connections[id];
},
close: function(port) {
if (connections[port]) {
if (connections[port].tout != null) clearTimeout(connections[port].tout);
connections[port]._closing = true;
try {
connections[port].close(function() {
util.log("[serial] serial port closed");
});
} catch(err) { };
}
delete connections[port];
}
}
}();
RED.httpAdmin.get("/serialports",function(req,res) {
serialp.list(function (err, ports) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.write(JSON.stringify(ports));
res.end();
}();
RED.httpAdmin.get("/serialports",function(req,res) {
serialp.list(function (err, ports) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.write(JSON.stringify(ports));
res.end();
});
});
});
}

View File

@@ -14,229 +14,208 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var reconnectTime = RED.settings.socketReconnectTime||10000;
var socketTimeout = RED.settings.socketTimeout||null;
var net = require('net');
var connectionPool = {};
function TcpIn(n) {
RED.nodes.createNode(this,n);
this.host = n.host;
this.port = n.port * 1;
this.topic = n.topic;
this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/
this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */
this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r");
this.base64 = n.base64;
this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
this.closing = false;
var node = this;
if (!node.server) {
var buffer = null;
var client;
var reconnectTimeout;
function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port);
var id = (1+Math.random()*4294967295).toString(16);
client = net.connect(node.port, node.host, function() {
buffer = (node.datatype == 'buffer')? new Buffer(0):"";
node.log("connected to "+node.host+":"+node.port);
});
connectionPool[id] = client;
client.on('data', function (data) {
if (node.datatype != 'buffer') {
data = data.toString(node.datatype);
}
if (node.stream) {
if ((node.datatype) === "utf8" && node.newline != "") {
buffer = buffer+data;
var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i]};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
buffer = parts[parts.length-1];
} else {
var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
} else {
if ((typeof data) === "string") {
buffer = buffer+data;
} else {
buffer = Buffer.concat([buffer,data],buffer.length+data.length);
}
}
});
client.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg);
buffer = null;
}
});
client.on('close', function() {
delete connectionPool[id];
node.log("connection lost to "+node.host+":"+node.port);
if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
}
});
client.on('error', function(err) {
node.log(err);
});
}
setupTcpClient();
this.on('close', function() {
this.closing = true;
client.end();
clearTimeout(reconnectTimeout);
});
} else {
var server = net.createServer(function (socket) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var id = (1+Math.random()*4294967295).toString(16);
connectionPool[id] = socket;
var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
socket.on('data', function (data) {
if (node.datatype != 'buffer') {
data = data.toString(node.datatype);
}
if (node.stream) {
if ((typeof data) === "string" && node.newline != "") {
buffer = buffer+data;
var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
buffer = parts[parts.length-1];
} else {
var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
} else {
if ((typeof data) === "string") {
buffer = buffer+data;
} else {
buffer = Buffer.concat([buffer,data],buffer.length+data.length);
}
}
});
socket.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg);
buffer = null;
}
});
socket.on('timeout', function() {
node.log('timeout closed socket port '+node.port);
socket.end();
});
socket.on('close', function() {
delete connectionPool[id];
});
socket.on('error',function(err) {
node.log(err);
});
});
server.on('error', function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
}
});
server.listen(node.port, function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
} else {
node.log('listening on port '+node.port);
module.exports = function(RED) {
var reconnectTime = RED.settings.socketReconnectTime||10000;
var socketTimeout = RED.settings.socketTimeout||null;
var net = require('net');
node.on('close', function() {
node.closing = true;
server.close();
node.log('stopped listening on port '+node.port);
var connectionPool = {};
function TcpIn(n) {
RED.nodes.createNode(this,n);
this.host = n.host;
this.port = n.port * 1;
this.topic = n.topic;
this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/
this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */
this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r");
this.base64 = n.base64;
this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
this.closing = false;
var node = this;
if (!node.server) {
var buffer = null;
var client;
var reconnectTimeout;
function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port);
var id = (1+Math.random()*4294967295).toString(16);
client = net.connect(node.port, node.host, function() {
buffer = (node.datatype == 'buffer')? new Buffer(0):"";
node.log("connected to "+node.host+":"+node.port);
});
connectionPool[id] = client;
client.on('data', function (data) {
if (node.datatype != 'buffer') {
data = data.toString(node.datatype);
}
if (node.stream) {
if ((node.datatype) === "utf8" && node.newline != "") {
buffer = buffer+data;
var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i]};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
buffer = parts[parts.length-1];
} else {
var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
} else {
if ((typeof data) === "string") {
buffer = buffer+data;
} else {
buffer = Buffer.concat([buffer,data],buffer.length+data.length);
}
}
});
client.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg);
buffer = null;
}
});
client.on('close', function() {
delete connectionPool[id];
node.log("connection lost to "+node.host+":"+node.port);
if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
}
});
client.on('error', function(err) {
node.log(err);
});
}
});
}
}
RED.nodes.registerType("tcp in",TcpIn);
function TcpOut(n) {
RED.nodes.createNode(this,n);
this.host = n.host;
this.port = n.port * 1;
this.base64 = n.base64;
this.beserver = n.beserver;
this.name = n.name;
this.closing = false;
var node = this;
if (!node.beserver||node.beserver=="client") {
var reconnectTimeout;
var client = null;
var connected = false;
function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port);
client = net.connect(node.port, node.host, function() {
connected = true;
node.log("connected to "+node.host+":"+node.port);
setupTcpClient();
this.on('close', function() {
this.closing = true;
client.end();
clearTimeout(reconnectTimeout);
});
client.on('error', function (err) {
node.log('error : '+err);
} else {
var server = net.createServer(function (socket) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var id = (1+Math.random()*4294967295).toString(16);
connectionPool[id] = socket;
var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
socket.on('data', function (data) {
if (node.datatype != 'buffer') {
data = data.toString(node.datatype);
}
if (node.stream) {
if ((typeof data) === "string" && node.newline != "") {
buffer = buffer+data;
var parts = buffer.split(node.newline);
for (var i = 0;i<parts.length-1;i+=1) {
var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
buffer = parts[parts.length-1];
} else {
var msg = {topic:node.topic, payload:data};
msg._session = {type:"tcp",id:id};
node.send(msg);
}
} else {
if ((typeof data) === "string") {
buffer = buffer+data;
} else {
buffer = Buffer.concat([buffer,data],buffer.length+data.length);
}
}
});
socket.on('end', function() {
if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
var msg = {topic:node.topic,payload:buffer};
msg._session = {type:"tcp",id:id};
node.send(msg);
buffer = null;
}
});
socket.on('timeout', function() {
node.log('timeout closed socket port '+node.port);
socket.end();
});
socket.on('close', function() {
delete connectionPool[id];
});
socket.on('error',function(err) {
node.log(err);
});
});
client.on('end', function (err) {
server.on('error', function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
}
});
client.on('close', function() {
node.log("connection lost to "+node.host+":"+node.port);
connected = false;
client.destroy();
if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
server.listen(node.port, function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
} else {
node.log('listening on port '+node.port);
node.on('close', function() {
node.closing = true;
server.close();
node.log('stopped listening on port '+node.port);
});
}
});
}
setupTcpClient();
node.on("input", function(msg) {
if (connected && msg.payload != null) {
if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload);
} else if (typeof msg.payload === "string" && node.base64) {
client.write(new Buffer(msg.payload,'base64'));
} else {
client.write(new Buffer(""+msg.payload));
}
}
RED.nodes.registerType("tcp in",TcpIn);
function TcpOut(n) {
RED.nodes.createNode(this,n);
this.host = n.host;
this.port = n.port * 1;
this.base64 = n.base64;
this.beserver = n.beserver;
this.name = n.name;
this.closing = false;
var node = this;
if (!node.beserver||node.beserver=="client") {
var reconnectTimeout;
var client = null;
var connected = false;
function setupTcpClient() {
node.log("connecting to "+node.host+":"+node.port);
client = net.connect(node.port, node.host, function() {
connected = true;
node.log("connected to "+node.host+":"+node.port);
});
client.on('error', function (err) {
node.log('error : '+err);
});
client.on('end', function (err) {
});
client.on('close', function() {
node.log("connection lost to "+node.host+":"+node.port);
connected = false;
client.destroy();
if (!node.closing) {
reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
}
});
}
});
node.on("close", function() {
this.closing = true;
client.end();
clearTimeout(reconnectTimeout);
});
} else if (node.beserver == "reply") {
node.on("input",function(msg) {
if (msg._session && msg._session.type == "tcp") {
var client = connectionPool[msg._session.id];
if (client) {
setupTcpClient();
node.on("input", function(msg) {
if (connected && msg.payload != null) {
if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload);
} else if (typeof msg.payload === "string" && node.base64) {
@@ -245,62 +224,84 @@ function TcpOut(n) {
client.write(new Buffer(""+msg.payload));
}
}
}
});
} else {
var connectedSockets = [];
var server = net.createServer(function (socket) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
node.log("connection from "+remoteDetails);
connectedSockets.push(socket);
socket.on('timeout', function() {
node.log('timeout closed socket port '+node.port);
socket.end();
});
socket.on('close',function() {
node.log("connection closed from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1);
node.on("close", function() {
this.closing = true;
client.end();
clearTimeout(reconnectTimeout);
});
socket.on('error',function() {
node.log("socket error from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1);
});
});
node.on("input", function(msg) {
if (msg.payload != null) {
var buffer;
if (Buffer.isBuffer(msg.payload)) {
buffer = msg.payload;
} else if (typeof msg.payload === "string" && node.base64) {
buffer = new Buffer(msg.payload,'base64');
} else {
buffer = new Buffer(""+msg.payload);
} else if (node.beserver == "reply") {
node.on("input",function(msg) {
if (msg._session && msg._session.type == "tcp") {
var client = connectionPool[msg._session.id];
if (client) {
if (Buffer.isBuffer(msg.payload)) {
client.write(msg.payload);
} else if (typeof msg.payload === "string" && node.base64) {
client.write(new Buffer(msg.payload,'base64'));
} else {
client.write(new Buffer(""+msg.payload));
}
}
}
for (var i = 0; i<connectedSockets.length;i+=1) {
connectedSockets[i].write(buffer);
}
}
});
server.on('error', function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
}
});
server.listen(node.port, function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
} else {
node.log('listening on port '+node.port);
node.on('close', function() {
server.close();
node.log('stopped listening on port '+node.port);
});
} else {
var connectedSockets = [];
var server = net.createServer(function (socket) {
if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
node.log("connection from "+remoteDetails);
connectedSockets.push(socket);
socket.on('timeout', function() {
node.log('timeout closed socket port '+node.port);
socket.end();
});
}
});
socket.on('close',function() {
node.log("connection closed from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1);
});
socket.on('error',function() {
node.log("socket error from "+remoteDetails);
connectedSockets.splice(connectedSockets.indexOf(socket),1);
});
});
node.on("input", function(msg) {
if (msg.payload != null) {
var buffer;
if (Buffer.isBuffer(msg.payload)) {
buffer = msg.payload;
} else if (typeof msg.payload === "string" && node.base64) {
buffer = new Buffer(msg.payload,'base64');
} else {
buffer = new Buffer(""+msg.payload);
}
for (var i = 0; i<connectedSockets.length;i+=1) {
connectedSockets[i].write(buffer);
}
}
});
server.on('error', function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
}
});
server.listen(node.port, function(err) {
if (err) {
node.error('unable to listen on port '+node.port+' : '+err);
} else {
node.log('listening on port '+node.port);
node.on('close', function() {
server.close();
node.log('stopped listening on port '+node.port);
});
}
});
}
}
RED.nodes.registerType("tcp out",TcpOut);
}
RED.nodes.registerType("tcp out",TcpOut);

View File

@@ -14,102 +14,53 @@
* limitations under the License.
**/
var RED = require(process.env.NODE_RED_HOME+"/red/red");
var dgram = require('dgram');
// The Input Node
function UDPin(n) {
RED.nodes.createNode(this,n);
this.group = n.group;
this.port = n.port;
this.datatype = n.datatype;
this.iface = n.iface || null;
this.multicast = n.multicast;
var node = this;
var server = dgram.createSocket('udp4');
server.on("error", function (err) {
if ((err.code == "EACCES") && (node.port < 1024)) {
node.error("UDP access error, you may need root access for ports below 1024");
} else {
node.error("UDP error : "+err.code);
}
server.close();
});
server.on('message', function (message, remote) {
var msg;
if (node.datatype =="base64") {
msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port };
} else if (node.datatype =="utf8") {
msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port };
} else {
msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port };
}
node.send(msg);
});
server.on('listening', function () {
var address = server.address();
node.log('udp listener at ' + address.address + ":" + address.port);
if (node.multicast == "true") {
server.setBroadcast(true);
try {
server.setMulticastTTL(128);
server.addMembership(node.group,node.iface);
node.log("udp multicast group "+node.group);
} catch (e) {
if (e.errno == "EINVAL") {
node.error("Bad Multicast Address");
} else if (e.errno == "ENODEV") {
node.error("Must be ip address of the required interface");
} else {
node.error("Error :"+e.errno);
}
module.exports = function(RED) {
var dgram = require('dgram');
// The Input Node
function UDPin(n) {
RED.nodes.createNode(this,n);
this.group = n.group;
this.port = n.port;
this.datatype = n.datatype;
this.iface = n.iface || null;
this.multicast = n.multicast;
var node = this;
var server = dgram.createSocket('udp4');
server.on("error", function (err) {
if ((err.code == "EACCES") && (node.port < 1024)) {
node.error("UDP access error, you may need root access for ports below 1024");
} else {
node.error("UDP error : "+err.code);
}
}
});
node.on("close", function() {
try {
server.close();
node.log('udp listener stopped');
} catch (err) {
node.error(err);
}
});
server.bind(node.port,node.iface);
}
RED.nodes.registerType("udp in",UDPin);
// The Output Node
function UDPout(n) {
RED.nodes.createNode(this,n);
//this.group = n.group;
this.port = n.port;
this.outport = n.outport||"";
this.base64 = n.base64;
this.addr = n.addr;
this.iface = n.iface || null;
this.multicast = n.multicast;
var node = this;
var sock = dgram.createSocket('udp4'); // only use ipv4 for now
if (node.multicast != "false") {
if (node.outport == "") { node.outport = node.port; }
sock.bind(node.outport, function() { // have to bind before you can enable broadcast...
sock.setBroadcast(true); // turn on broadcast
if (node.multicast == "multi") {
});
server.on('message', function (message, remote) {
var msg;
if (node.datatype =="base64") {
msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port };
} else if (node.datatype =="utf8") {
msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port };
} else {
msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port };
}
node.send(msg);
});
server.on('listening', function () {
var address = server.address();
node.log('udp listener at ' + address.address + ":" + address.port);
if (node.multicast == "true") {
server.setBroadcast(true);
try {
sock.setMulticastTTL(128);
sock.addMembership(node.addr,node.iface); // Add to the multicast group
node.log('udp multicast ready : '+node.outport+' -> '+node.addr+":"+node.port);
server.setMulticastTTL(128);
server.addMembership(node.group,node.iface);
node.log("udp multicast group "+node.group);
} catch (e) {
if (e.errno == "EINVAL") {
if (e.errno == "EINVAL") {
node.error("Bad Multicast Address");
} else if (e.errno == "ENODEV") {
node.error("Must be ip address of the required interface");
@@ -117,52 +68,102 @@ function UDPout(n) {
node.error("Error :"+e.errno);
}
}
} else {
node.log('udp broadcast ready : '+node.outport+' -> '+node.addr+":"+node.port);
}
});
} else if (node.outport != "") {
sock.bind(node.outport);
node.log('udp ready : '+node.outport+' -> '+node.addr+":"+node.port);
} else {
node.log('udp ready : '+node.addr+":"+node.port);
}
node.on("input", function(msg) {
if (msg.payload != null) {
var add = node.addr || msg.ip || "";
var por = node.port || msg.port || 0;
if (add == "") {
node.warn("udp: ip address not set");
} else if (por == 0) {
node.warn("udp: port not set");
} else if (isNaN(por) || (por < 1) || (por > 65535)) {
node.warn("udp: port number not valid");
} else {
var message;
if (node.base64) {
message = new Buffer(b64string, 'base64');
} else if (msg.payload instanceof Buffer) {
message = msg.payload;
} else {
message = new Buffer(""+msg.payload);
}
sock.send(message, 0, message.length, por, add, function(err, bytes) {
if (err) {
node.error("udp : "+err);
}
});
node.on("close", function() {
try {
server.close();
node.log('udp listener stopped');
} catch (err) {
node.error(err);
}
});
server.bind(node.port,node.iface);
}
RED.nodes.registerType("udp in",UDPin);
// The Output Node
function UDPout(n) {
RED.nodes.createNode(this,n);
//this.group = n.group;
this.port = n.port;
this.outport = n.outport||"";
this.base64 = n.base64;
this.addr = n.addr;
this.iface = n.iface || null;
this.multicast = n.multicast;
var node = this;
var sock = dgram.createSocket('udp4'); // only use ipv4 for now
if (node.multicast != "false") {
if (node.outport == "") { node.outport = node.port; }
sock.bind(node.outport, function() { // have to bind before you can enable broadcast...
sock.setBroadcast(true); // turn on broadcast
if (node.multicast == "multi") {
try {
sock.setMulticastTTL(128);
sock.addMembership(node.addr,node.iface); // Add to the multicast group
node.log('udp multicast ready : '+node.outport+' -> '+node.addr+":"+node.port);
} catch (e) {
if (e.errno == "EINVAL") {
node.error("Bad Multicast Address");
} else if (e.errno == "ENODEV") {
node.error("Must be ip address of the required interface");
} else {
node.error("Error :"+e.errno);
}
}
} else {
node.log('udp broadcast ready : '+node.outport+' -> '+node.addr+":"+node.port);
}
});
} else if (node.outport != "") {
sock.bind(node.outport);
node.log('udp ready : '+node.outport+' -> '+node.addr+":"+node.port);
} else {
node.log('udp ready : '+node.addr+":"+node.port);
}
});
node.on("close", function() {
try {
sock.close();
node.log('udp output stopped');
} catch (err) {
node.error(err);
}
});
node.on("input", function(msg) {
if (msg.payload != null) {
var add = node.addr || msg.ip || "";
var por = node.port || msg.port || 0;
if (add == "") {
node.warn("udp: ip address not set");
} else if (por == 0) {
node.warn("udp: port not set");
} else if (isNaN(por) || (por < 1) || (por > 65535)) {
node.warn("udp: port number not valid");
} else {
var message;
if (node.base64) {
message = new Buffer(b64string, 'base64');
} else if (msg.payload instanceof Buffer) {
message = msg.payload;
} else {
message = new Buffer(""+msg.payload);
}
sock.send(message, 0, message.length, por, add, function(err, bytes) {
if (err) {
node.error("udp : "+err);
}
});
}
}
});
node.on("close", function() {
try {
sock.close();
node.log('udp output stopped');
} catch (err) {
node.error(err);
}
});
}
RED.nodes.registerType("udp out",UDPout);
}
RED.nodes.registerType("udp out",UDPout);