From 5f1513bed3ca80accd6258b2bf771f963d412024 Mon Sep 17 00:00:00 2001 From: Dave Conway-Jones Date: Thu, 17 Dec 2015 17:15:01 +0000 Subject: [PATCH] update Stomp to support v1.1 Thanks @ozomer - to close #141 --- io/stomp/18-stomp.html | 26 +++++++- io/stomp/18-stomp.js | 142 +++++++++++++++++++++++------------------ io/stomp/package.json | 4 +- 3 files changed, 107 insertions(+), 65 deletions(-) diff --git a/io/stomp/18-stomp.html b/io/stomp/18-stomp.html index 779bcb4f..708d148c 100644 --- a/io/stomp/18-stomp.html +++ b/io/stomp/18-stomp.html @@ -128,6 +128,26 @@ + +
+ + +
+
+ + +
+
+ + +
+
+ + +
@@ -139,7 +159,11 @@ category: 'config', defaults: { server: {required:true}, - port: {value:61618,required:true,validate:RED.validators.number()}, + port: {value:61613,required:true,validate:RED.validators.number()}, + protocolversion: {value:"1.0",required:true}, + vhost: {}, + reconnectretries: {value:"0",required:true,validate:RED.validators.number()}, + reconnectdelay: {value:"0.5",required:true,validate:RED.validators.number()}, name: {} }, credentials: { diff --git a/io/stomp/18-stomp.js b/io/stomp/18-stomp.js index b70a3b9c..41c484c6 100644 --- a/io/stomp/18-stomp.js +++ b/io/stomp/18-stomp.js @@ -23,6 +23,10 @@ module.exports = function(RED) { RED.nodes.createNode(this,n); this.server = n.server; this.port = n.port; + this.protocolversion = n.protocolversion; + this.vhost = n.vhost; + this.reconnectretries = n.reconnectretries; + this.reconnectdelay = n.reconnectdelay * 1000; this.name = n.name; this.username = this.credentials.user; this.password = this.credentials.password; @@ -40,59 +44,64 @@ module.exports = function(RED) { this.topic = n.topic; this.serverConfig = RED.nodes.getNode(this.server); - this.host = this.serverConfig.server; - this.port = this.serverConfig.port; - this.userid = this.serverConfig.username; - this.password = this.serverConfig.password; + this.stompClientOpts = { + address: this.serverConfig.server, + port: this.serverConfig.port * 1, + user: this.serverConfig.username, + pass: this.serverConfig.password, + protocolVersion: this.serverConfig.protocolversion, + reconnectOpts: { + retries: this.serverConfig.reconnectretries * 1, + delay: this.serverConfig.reconnectdelay * 1 + } + }; + if (this.serverConfig.vhost) { + this.stompClientOpts.vhost = this.serverConfig.vhost; + } var node = this; var msg = {topic:this.topic}; - var closing = false; + node.client = new StompClient(node.stompClientOpts); - node.client = new StompClient(node.host, node.port, node.userid, node.password, '1.0'); - node.status({fill:"grey",shape:"ring",text:"connecting"}); + node.client.on("connect", function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); - var doConnect = function() { - node.client.connect(function(sessionId) { - node.status({fill:"green",shape:"dot",text:"connected"}); - node.log('subscribed to: '+node.topic); - node.client.subscribe(node.topic, function(body, headers) { - try { - msg.payload = JSON.parse(body); - } - catch(e) { - msg.payload = body; - } - msg.headers = headers; - msg.topic = node.topic; - node.send(msg); - }); - }, function(error) { - node.status({fill:"grey",shape:"dot",text:"error"}); - node.warn(error); - }); - } - - node.client.on("disconnect", function() { - node.status({fill:"red",shape:"ring",text:"disconnected"}); - if (!closing) { - setTimeout( function () { doConnect(); }, 15000); - } + node.client.on("reconnecting", function() { + node.status({fill:"red",shape:"ring",text:"reconnecting"}); + node.warn("reconnecting"); }); node.client.on("error", function(error) { node.status({fill:"grey",shape:"dot",text:"error"}); - node.log(error); + node.warn(error); }); - doConnect(); + node.status({fill:"grey",shape:"ring",text:"connecting"}); + node.client.connect(function(sessionId) { + node.log('subscribing to: '+node.topic); + node.client.subscribe(node.topic, function(body, headers) { + try { + msg.payload = JSON.parse(body); + } + catch(e) { + msg.payload = body; + } + msg.headers = headers; + msg.topic = node.topic; + node.send(msg); + }); + }, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); + }); node.on("close", function(done) { - closing = true; if (node.client) { - node.client.disconnect(function() { done(); }); + // disconnect can accept a callback - but it is not always called. + node.client.disconnect(); } - else { done(); } + done(); }); } RED.nodes.registerType("stomp in",StompInNode); @@ -104,34 +113,43 @@ module.exports = function(RED) { this.topic = n.topic; this.serverConfig = RED.nodes.getNode(this.server); - this.host = this.serverConfig.server; - this.port = this.serverConfig.port; - this.userid = this.serverConfig.username; - this.password = this.serverConfig.password; + this.stompClientOpts = { + address: this.serverConfig.server, + port: this.serverConfig.port * 1, + user: this.serverConfig.username, + pass: this.serverConfig.password, + protocolVersion: this.serverConfig.protocolversion, + reconnectOpts: { + retries: this.serverConfig.reconnectretries * 1, + delay: this.serverConfig.reconnectdelay * 1 + } + }; + if (this.serverConfig.vhost) { + this.stompClientOpts.vhost = this.serverConfig.vhost; + } var node = this; - var msg = {topic:this.topic}; - var closing = false; + node.client = new StompClient(node.stompClientOpts); - node.client = new StompClient(node.host, node.port, node.userid, node.password, '1.0'); - node.status({fill:"grey",shape:"ring",text:"connecting"}); + node.client.on("connect", function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); - node.client.connect( function(sessionId) { - node.status({fill:"green",shape:"dot",text:"connected"}); - }, function(error) { + node.client.on("reconnecting", function() { + node.status({fill:"red",shape:"ring",text:"reconnecting"}); + node.warn("reconnecting"); + }); + + node.client.on("error", function(error) { node.status({fill:"grey",shape:"dot",text:"error"}); node.warn(error); }); - node.client.on("disconnect", function() { - node.status({fill:"red",shape:"ring",text:"disconnected"}); - if (!closing) { - setTimeout( function () { node.client.connect(); }, 15000); - } - }); - - node.client.on("error", function(error) { - node.log(error); + node.status({fill:"grey",shape:"ring",text:"connecting"}); + node.client.connect(function(sessionId) { + }, function(error) { + node.status({fill:"grey",shape:"dot",text:"error"}); + node.warn(error); }); node.on("input", function(msg) { @@ -139,13 +157,13 @@ module.exports = function(RED) { }); node.on("close", function(done) { - closing = true; if (node.client) { - node.client.disconnect(function() { done(); }); + // disconnect can accept a callback - but it is not always called. + node.client.disconnect(); } - else { done(); } + done(); }); } RED.nodes.registerType("stomp out",StompOutNode); -} +}; diff --git a/io/stomp/package.json b/io/stomp/package.json index fd620344..093b840c 100644 --- a/io/stomp/package.json +++ b/io/stomp/package.json @@ -1,9 +1,9 @@ { "name" : "node-red-node-stomp", - "version" : "0.0.7", + "version" : "0.0.8", "description" : "A Node-RED node to publish and subscribe to/from a Stomp server", "dependencies" : { - "stomp-client" : "0.6.*" + "stomp-client" : "0.8.*" }, "repository" : { "type":"git",