2014-05-19 22:07:20 +02:00
module . exports = function ( RED ) {
"use strict" ;
var StompClient = require ( 'stomp-client' ) ;
2023-05-30 12:05:25 +02:00
// ----------------------------------------------
// ------------------- State --------------------
// ----------------------------------------------
function updateStatus ( node , allNodes ) {
let setStatus = setStatusDisconnected ;
if ( node . connecting ) {
setStatus = setStatusConnecting ;
} else if ( node . connected ) {
setStatus = setStatusConnected ;
}
setStatus ( node , allNodes ) ;
}
function setStatusDisconnected ( node , allNodes ) {
if ( allNodes ) {
for ( let id in node . users ) {
if ( hasProperty ( node . users , id ) ) {
node . users [ id ] . status ( { fill : "red" , shape : "ring" , text : "node-red:common.status.disconnected" } ) ;
}
}
} else {
node . status ( { fill : "red" , shape : "ring" , text : "node-red:common.status.disconnected" } )
}
}
function setStatusConnecting ( node , allNodes ) {
if ( allNodes ) {
for ( var id in node . users ) {
if ( hasProperty ( node . users , id ) ) {
node . users [ id ] . status ( { fill : "yellow" , shape : "ring" , text : "node-red:common.status.connecting" } ) ;
}
}
} else {
node . status ( { fill : "yellow" , shape : "ring" , text : "node-red:common.status.connecting" } ) ;
}
}
function setStatusConnected ( node , allNodes ) {
if ( allNodes ) {
for ( var id in node . users ) {
if ( hasProperty ( node . users , id ) ) {
node . users [ id ] . status ( { fill : "green" , shape : "dot" , text : "node-red:common.status.connected" } ) ;
}
}
} else {
node . status ( { fill : "green" , shape : "dot" , text : "node-red:common.status.connected" } ) ;
}
}
function setStatusError ( node , allNodes ) {
if ( allNodes ) {
for ( var id in node . users ) {
if ( hasProperty ( node . users , id ) ) {
node . users [ id ] . status ( { fill : "red" , shape : "dot" , text : "error" } ) ;
}
}
} else {
node . status ( { fill : "red" , shape : "dot" , text : "error" } ) ;
}
}
// ----------------------------------------------
// ------------------- Nodes --------------------
// ----------------------------------------------
2014-05-19 22:07:20 +02:00
function StompServerNode ( n ) {
RED . nodes . createNode ( this , n ) ;
2023-05-30 12:05:25 +02:00
const node = this ;
// To keep track of processing nodes that use this config node for their connection
node . users = { } ;
// Config node state
node . connected = false ;
node . connecting = false ;
/** Flag to avoid race conditions between `deregister` and the `close` event of the config node (ex. on redeploy) */
node . closing = false ;
/** Options to pass to the stomp-client API */
node . options = { } ;
node . sessionId = null ;
node . subscribtionIndex = 1 ;
node . subscriptionIds = { } ;
/** @type { StompClient } */
node . client ;
node . setOptions = function ( options , init ) {
if ( ! options || typeof options !== "object" ) {
return ; // Nothing to change
}
// Apply property changes (only if the property exists in the options object)
setIfHasProperty ( options , node , "server" , init ) ;
setIfHasProperty ( options , node , "port" , init ) ;
setIfHasProperty ( options , node , "protocolversion" , init ) ;
setIfHasProperty ( options , node , "vhost" , init ) ;
setIfHasProperty ( options , node , "reconnectretries" , init ) ;
setIfHasProperty ( options , node , "reconnectdelay" , init ) ;
if ( node . credentials ) {
2023-06-06 12:25:06 +02:00
node . username = node . credentials . user ;
2023-05-30 12:05:25 +02:00
node . password = node . credentials . password ;
}
if ( ! init && hasProperty ( options , "username" ) ) {
node . username = options . username ;
}
if ( ! init && hasProperty ( options , "password" ) ) {
node . password = options . password ;
}
// Build options for passing to the stomp-client API
node . options = {
address : node . server ,
port : node . port * 1 ,
user : node . username ,
pass : node . password ,
protocolVersion : node . protocolversion ,
reconnectOpts : {
retries : node . reconnectretries * 1 ,
delay : node . reconnectdelay * 1000
} ,
vhost : node . vhost
} ;
}
node . setOptions ( n , true ) ;
/ * *
* Register a STOMP processing node to the connection .
* @ param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to register
* @ param { Function } callback
* /
node . register = function ( stompNode , callback = ( ) => { } ) {
node . users [ stompNode . id ] = stompNode ;
// Auto connect when first STOMP processing node is added
if ( Object . keys ( node . users ) . length === 1 ) {
node . connect ( callback ) ;
} else {
callback ( ) ;
}
}
/ * *
* Remove registered STOMP processing nodes from the connection .
* @ param { StompInNode | StompOutNode | StompAckNode } stompNode The STOMP processing node to unregister
* @ param { Boolean } autoDisconnect Automatically disconnect from the STOM server when no processing nodes registered to the connection
* @ param { Function } callback
* /
node . deregister = function ( stompNode , autoDisconnect , callback = ( ) => { } ) {
delete node . users [ stompNode . id ] ;
if ( autoDisconnect && ! node . closing && node . connected && Object . keys ( node . users ) . length === 0 ) {
node . disconnect ( callback ) ;
} else {
callback ( ) ;
}
}
/ * *
* Wether a new connection can be made .
* @ returns ` true ` or ` false `
* /
node . canConnect = function ( ) {
return ! node . connected && ! node . connecting ;
}
/ * *
* Connect to the STOMP server .
* @ param { Function } callback
* /
node . connect = function ( callback = ( ) => { } ) {
if ( node . canConnect ( ) ) {
node . closing = false ;
node . connecting = true ;
setStatusConnecting ( node , true ) ;
try {
// Remove left over client if needed
if ( node . client ) {
node . client . disconnect ( ) ;
node . client = null ;
}
node . client = new StompClient ( node . options ) ;
node . client . on ( "connect" , function ( ) {
node . closing = false ;
node . connecting = false ;
node . connected = true ;
callback ( ) ;
node . log ( "Connected to STOMP server" , { sessionId : node . sessionId , url : ` ${ node . options . address } : ${ node . options . port } ` , protocolVersion : node . options . protocolVersion } ) ;
setStatusConnected ( node , true ) ;
} ) ;
node . client . on ( "reconnect" , function ( sessionId , numOfRetries ) {
node . closing = false ;
node . connecting = false ;
node . connected = true ;
node . sessionId = sessionId ;
callback ( ) ;
node . log ( "Reconnected to STOMP server" , { sessionId : node . sessionId , url : ` ${ node . options . address } : ${ node . options . port } ` , protocolVersion : node . options . protocolVersion , retries : numOfRetries } ) ;
setStatusConnected ( node , true ) ;
} ) ;
node . client . on ( "reconnecting" , function ( ) {
node . warn ( "reconnecting" ) ;
node . connecting = true ;
node . connected = false ;
node . log ( "Reconnecting to STOMP server..." , { url : ` ${ node . options . address } : ${ node . options . port } ` , protocolVersion : node . options . protocolVersion } ) ;
setStatusConnecting ( node , true ) ;
} ) ;
node . client . on ( "error" , function ( err ) {
node . error ( err ) ;
setStatusError ( node , true ) ;
} ) ;
node . client . connect ( function ( sessionId ) {
node . sessionId = sessionId ;
} ) ;
} catch ( err ) {
node . error ( err ) ;
}
} else {
node . log ( "Not connecting to STOMP server, already connected" ) ;
callback ( ) ;
}
}
/ * *
* Disconnect from the STOMP server .
* @ param { Function } callback
* /
node . disconnect = function ( callback = ( ) => { } ) {
const waitDisconnect = ( client , timeout ) => {
return new Promise ( ( resolve , reject ) => {
// Set flag to avoid race conditions for disconnect as every node tries to call it directly or indirectly using deregister
node . closing = true ;
const t = setTimeout ( ( ) => {
reject ( ) ;
} , timeout ) ;
client . disconnect ( ( ) => {
clearTimeout ( t ) ;
resolve ( ) ;
} ) ;
} ) ;
}
if ( ! node . client ) {
node . warn ( "Can't disconnect, connection not initialized." ) ;
callback ( ) ;
} else if ( node . closing || ! node . connected ) {
// Disconnection already in progress or not connected
callback ( ) ;
} else {
node . log ( "Unsubscribing from STOMP queue's..." ) ;
const subscribedQueues = Object . keys ( node . subscriptionIds ) ;
subscribedQueues . forEach ( function ( queue ) {
node . unsubscribe ( queue ) ;
} ) ;
node . log ( 'Disconnecting from STOMP server...' ) ;
waitDisconnect ( node . client , 2000 ) . then ( ( ) => {
node . log ( "Disconnected from STOMP server" , { sessionId : node . sessionId , url : ` ${ node . options . address } : ${ node . options . port } ` , protocolVersion : node . options . protocolVersion } )
} ) . catch ( ( ) => {
node . log ( "Disconnect timeout closing node..." ) ;
} ) . finally ( ( ) => {
node . sessionId = null ;
node . subscribtionIndex = 1 ;
node . subscriptionIds = { } ;
node . connected = false ;
node . connecting = false ;
setStatusDisconnected ( node , true ) ;
callback ( ) ;
} ) ;
}
}
/ * *
* Subscribe to a given STOMP queue .
* @ param { String } queue The queue to subscribe to
* @ param { "auto" | "client" | "client-individual" } clientAck Can be ` auto ` , ` client ` or ` client-individual ` ( the latter only starting from STOMP v1 . 1 )
* @ param { Function } callback
* /
node . subscribe = function ( queue , acknowledgment , callback ) {
node . log ( ` Subscribing to: ${ queue } ` ) ;
if ( node . connected && ! node . closing ) {
if ( ! node . subscriptionIds [ queue ] ) {
node . subscriptionIds [ queue ] = node . subscribtionIndex ++ ;
}
const headers = {
id : node . subscriptionIds [ queue ] ,
// Only set client-individual if not v1.0
ack : acknowledgment === "client-individual" && node . options . protocolVersion === "1.0" ? "client" : acknowledgment
}
node . client . subscribe ( queue , headers , function ( body , responseHeaders ) {
let msg = { headers : responseHeaders , topic : queue } ;
try {
msg . payload = JSON . parse ( body ) ;
} catch {
msg . payload = body ;
}
callback ( msg ) ;
} ) ;
} else {
node . error ( "Can't subscribe, not connected" ) ;
}
}
/ * *
* Unsubscribe from a STOMP queue .
* @ param { String } queue The STOMP queue to unsubscribe from
* @ param { Object } headers Headers to add to the unsubscribe message
* /
node . unsubscribe = function ( queue , headers = { } ) {
delete node . subscriptionIds [ queue ] ;
if ( node . connected && ! node . closing ) {
node . client . unsubscribe ( queue , headers ) ;
node . log ( ` Unsubscribed from ${ queue } ` , headers ) ;
}
}
/ * *
* Publish a STOMP message on a queue .
* @ param { String } queue The STOMP queue to publish to
* @ param { any } message The message to send
* @ param { Object } headers STOMP headers to add to the SEND command
* /
node . publish = function ( queue , message , headers = { } ) {
if ( node . connected && ! node . closing ) {
node . client . publish ( queue , message , headers ) ;
} else {
node . error ( "Can't publish, not connected" ) ;
}
}
/ * *
* Acknowledge ( a ) message ( s ) that was received from the specified queue .
* @ param { String } queue The queue / topic to send an acknowledgment for
* @ param { String } messageId ID of the message that was received from the server , which can be found in the reponse header as ` message-id `
* @ param { String } transaction Optional transaction name
* /
node . ack = function ( queue , messageId , transaction = undefined ) {
if ( node . connected && ! node . closing ) {
node . client . ack ( messageId , node . subscriptionIds [ queue ] , transaction ) ;
} else {
node . error ( "Can't send acknowledgment, not connected" ) ;
}
}
node . on ( "close" , function ( done ) {
node . disconnect ( function ( ) { done ( ) ; } ) ;
} ) ;
2014-05-19 22:07:20 +02:00
}
2023-05-30 12:05:25 +02:00
RED . nodes . registerType ( "stomp-server" , StompServerNode , {
2015-02-06 22:10:14 +01:00
credentials : {
2023-05-30 12:05:25 +02:00
user : { type : "text" } ,
password : { type : "password" }
2014-05-19 22:07:20 +02:00
}
} ) ;
function StompInNode ( n ) {
RED . nodes . createNode ( this , n ) ;
2023-05-30 12:05:25 +02:00
/** @type { StompInNode } */
const node = this ;
node . server = n . server ;
/** @type { StompServerNode } */
node . serverConnection = RED . nodes . getNode ( node . server ) ;
node . topic = n . topic ;
node . ack = n . ack ;
2014-05-26 11:07:20 +02:00
2023-05-30 12:05:25 +02:00
if ( node . serverConnection ) {
setStatusDisconnected ( node ) ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
if ( node . topic ) {
node . serverConnection . register ( node , function ( ) {
node . serverConnection . subscribe ( node . topic , node . ack , function ( msg ) {
node . send ( msg ) ;
} ) ;
} ) ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
if ( node . serverConnection . connected ) {
setStatusConnected ( node ) ;
2015-12-17 18:15:01 +01:00
}
2023-05-30 12:05:25 +02:00
}
} else {
node . error ( "Missing server config" ) ;
}
2014-05-26 11:07:20 +02:00
2023-05-30 12:05:25 +02:00
node . on ( "close" , function ( removed , done ) {
if ( node . serverConnection ) {
node . serverConnection . unsubscribe ( node . topic ) ;
node . serverConnection . deregister ( node , true , done ) ;
node . serverConnection = null ;
} else {
done ( ) ;
2015-04-17 22:06:27 +02:00
}
2014-05-19 22:07:20 +02:00
} ) ;
}
RED . nodes . registerType ( "stomp in" , StompInNode ) ;
function StompOutNode ( n ) {
RED . nodes . createNode ( this , n ) ;
2023-05-30 12:05:25 +02:00
/** @type { StompOutNode } */
const node = this ;
node . server = n . server ;
/** @type { StompServerNode } */
node . serverConnection = RED . nodes . getNode ( node . server ) ;
node . topic = n . topic ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
if ( node . serverConnection ) {
setStatusDisconnected ( node ) ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
node . on ( "input" , function ( msg , send , done ) {
2023-06-08 22:14:40 +02:00
const topic = node . topic || msg . topic ;
if ( topic . length > 0 && msg . payload ) {
2023-05-30 12:05:25 +02:00
try {
msg . payload = JSON . stringify ( msg . payload ) ;
} catch {
msg . payload = ` ${ msg . payload } ` ;
}
2023-06-08 22:14:40 +02:00
node . serverConnection . publish ( topic , msg . payload , msg . headers || { } ) ;
} else if ( ! topic . length > 0 ) {
node . warn ( 'No valid publish topic' ) ;
} else {
node . warn ( 'Payload or topic is undefined/null' )
2023-05-30 12:05:25 +02:00
}
2023-06-08 22:14:40 +02:00
done ( ) ;
2023-05-30 12:05:25 +02:00
} ) ;
2023-04-11 15:35:40 +02:00
2023-05-30 12:05:25 +02:00
node . serverConnection . register ( node ) ;
if ( node . serverConnection . connected ) {
setStatusConnected ( node ) ;
}
} else {
node . error ( "Missing server config" ) ;
}
2015-12-17 18:15:01 +01:00
2023-05-30 12:05:25 +02:00
node . on ( "close" , function ( removed , done ) {
if ( node . serverConnection ) {
node . serverConnection . deregister ( node , true , done ) ;
node . serverConnection = null ;
} else {
done ( ) ;
}
2014-05-19 22:07:20 +02:00
} ) ;
2023-05-30 12:05:25 +02:00
}
RED . nodes . registerType ( "stomp out" , StompOutNode ) ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
function StompAckNode ( n ) {
RED . nodes . createNode ( this , n ) ;
/** @type { StompOutNode } */
const node = this ;
node . server = n . server ;
/** @type { StompServerNode } */
node . serverConnection = RED . nodes . getNode ( node . server ) ;
node . topic = n . topic ;
2014-05-19 22:07:20 +02:00
2023-05-30 12:05:25 +02:00
if ( node . serverConnection ) {
setStatusDisconnected ( node ) ;
node . on ( "input" , function ( msg , send , done ) {
2023-06-08 22:14:40 +02:00
const topic = node . topic || msg . topic ;
if ( topic . length > 0 ) {
node . serverConnection . ack ( topic , msg . messageId , msg . transaction ) ;
} else if ( ! topic . length > 0 ) {
node . warn ( 'No valid publish topic' ) ;
} else {
node . warn ( 'Payload or topic is undefined/null' )
}
2023-05-30 12:05:25 +02:00
done ( ) ;
} ) ;
node . serverConnection . register ( node ) ;
if ( node . serverConnection . connected ) {
setStatusConnected ( node ) ;
}
} else {
node . error ( "Missing server config" ) ;
}
node . on ( "close" , function ( removed , done ) {
if ( node . serverConnection ) {
node . serverConnection . deregister ( node , true , done ) ;
node . serverConnection = null ;
} else {
done ( ) ;
2015-04-17 22:06:27 +02:00
}
2014-05-19 22:07:20 +02:00
} ) ;
}
2023-05-30 12:05:25 +02:00
RED . nodes . registerType ( "stomp ack" , StompAckNode ) ;
2014-05-19 22:07:20 +02:00
2015-12-17 18:15:01 +01:00
} ;
2023-05-30 12:05:25 +02:00
// ----------------------------------------------
// ----------------- Helpers --------------------
// ----------------------------------------------
/ * *
* Helper function for applying changes to an objects properties ONLY when the src object actually has the property .
* This avoids setting a ` dst ` property null / undefined when the ` src ` object doesnt have the named property .
* @ param { object } src Source object containing properties
* @ param { object } dst Destination object to set property
* @ param { string } propName The property name to set in the Destination object
* @ param { boolean } force force the dst property to be updated / created even if src property is empty
* /
function setIfHasProperty ( src , dst , propName , force ) {
if ( src && dst && propName ) {
const ok = force || hasProperty ( src , propName ) ;
if ( ok ) {
dst [ propName ] = src [ propName ] ;
}
}
}
/ * *
* Helper function to test an object has a property
* @ param { object } obj Object to test
* @ param { string } propName Name of property to find
* @ returns true if object has property ` propName `
* /
function hasProperty ( obj , propName ) {
//JavaScript does not protect the property name hasOwnProperty
//Object.prototype.hasOwnProperty.call is the recommended/safer test
return Object . prototype . hasOwnProperty . call ( obj , propName ) ;
}