class MCollective::Connector::Activemq

Handles sending and receiving messages over the Stomp protocol for ActiveMQ servers specifically, we take advantages of ActiveMQ specific features and enhancements to the Stomp protocol. For best results in a clustered environment use ActiveMQ 5.5.0 at least.

This plugin takes an entirely different approach to dealing with ActiveMQ from the more generic stomp connector.

- Agents use /topic/<collective>.<agent>.agent
- Replies use temp-topics so they are private and transient.
- Point to Point messages using topics are supported by subscribing to
  /queue/<collective>.nodes with a selector "mc_identity = 'identity'

The use of temp-topics for the replies is a huge improvement over the old style. In the old way all clients got replies for all clients that were active at that time, this would mean that they would need to decrypt, validate etc in order to determine if they need to ignore the message, this was computationally expensive and on large busy networks the messages were being sent all over the show cross broker boundaries.

The new way means the messages go point2point back to only whoever requested the message, they only get their own replies and this is ap private channel that casual observers cannot just snoop into.

This plugin supports 1.1.6 and newer of the Stomp rubygem.

connector = activemq
plugin.activemq.pool.size = 2

plugin.activemq.pool.1.host = stomp1.your.net
plugin.activemq.pool.1.port = 61613
plugin.activemq.pool.1.user = you
plugin.activemq.pool.1.password = secret
plugin.activemq.pool.1.ssl = true
plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
plugin.activemq.pool.1.ssl.key = /path/to/your.key
plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
plugin.activemq.pool.1.ssl.fallback = true
plugin.activemq.pool.1.ssl.ciphers = TLSv1:!MD5:!LOW:!EXPORT

plugin.activemq.pool.2.host = stomp2.your.net
plugin.activemq.pool.2.port = 61613
plugin.activemq.pool.2.user = you
plugin.activemq.pool.2.password = secret
plugin.activemq.pool.2.ssl = false

Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will default to 61613 if not specified.

The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these will imply full SSL validation will be done and you'll only be able to connect to a ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true and do not supply the cert, key and ca properties or if you have an older gem it will fall back to unverified mode only if ssl.fallback is true

In addition you can set the following options for the rubygem:

plugin.activemq.initial_reconnect_delay = 0.01
plugin.activemq.max_reconnect_delay = 30.0
plugin.activemq.use_exponential_back_off = true
plugin.activemq.back_off_multiplier = 2
plugin.activemq.max_reconnect_attempts = 0
plugin.activemq.randomize = false
plugin.activemq.timeout = -1

You can set the initial connetion timeout - this is when your stomp server is simply unreachable - after which it would failover to the next in the pool:

plugin.activemq.connect_timeout = 30

ActiveMQ JMS message priorities can be set:

plugin.activemq.priority = 4

This plugin supports Stomp protocol 1.1 when combined with the stomp gem version 1.2.10 or newer. To enable network heartbeats which will help keep the connection alive over NAT connections and aggresive session tracking firewalls you can set:

plugin.activemq.heartbeat_interval = 30

which will cause a heartbeat to be sent on 30 second intervals and one to be expected from the broker every 30 seconds. The shortest supported period is 30 seconds, if you set it lower it will get forced to 30 seconds.

After 2 failures to receive a heartbeat the connection will be reset via the normal failover mechanism.

By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback to 1.0, but you can enable strict Stomp 1.1 only operation

plugin.activemq.stomp_1_0_fallback = 0

Attributes

connection[R]

Public Class Methods

new() click to toggle source
    # File lib/mcollective/connector/activemq.rb
200 def initialize
201   @config = Config.instance
202   @subscriptions = []
203   @msgpriority = 0
204   @base64 = false
205   @use_exponential_back_off = get_bool_option("activemq.use_exponential_back_off", "true")
206   @initial_reconnect_delay = Float(get_option("activemq.initial_reconnect_delay", 0.01))
207   @back_off_multiplier = Integer(get_option("activemq.back_off_multiplier", 2))
208   @max_reconnect_delay = Float(get_option("activemq.max_reconnect_delay", 30.0))
209   @reconnect_delay = @initial_reconnect_delay
210 
211   Log.info("ActiveMQ connector initialized.  Using stomp-gem #{stomp_version}")
212 end

Public Instance Methods

connect(connector = ::Stomp::Connection) click to toggle source

Connects to the ActiveMQ middleware

    # File lib/mcollective/connector/activemq.rb
215 def connect(connector = ::Stomp::Connection)
216   if @connection
217     Log.debug("Already connection, not re-initializing connection")
218     return
219   end
220 
221   begin
222     @base64 = get_bool_option("activemq.base64", "false")
223     @msgpriority = get_option("activemq.priority", 0).to_i
224 
225     pools = Integer(get_option("activemq.pool.size"))
226     hosts = []
227     middleware_user = ''
228     middleware_password = ''
229     prompt_for_username = get_bool_option("activemq.prompt_user", "false")
230     prompt_for_password = get_bool_option("activemq.prompt_password", "false")
231     
232     if prompt_for_username
233       Log.debug("No previous user exists and activemq.prompt-user is set to true")
234       print "Please enter user to connect to middleware: "
235       middleware_user = STDIN.gets.chomp
236     end
237 
238     if prompt_for_password
239       Log.debug("No previous password exists and activemq.prompt-password is set to true")
240       middleware_password = MCollective::Util.get_hidden_input("Please enter password: ")
241       print "\n"
242     end
243 
244     1.upto(pools) do |poolnum|
245       host = {}
246 
247       host[:host] = get_option("activemq.pool.#{poolnum}.host")
248       host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
249       host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")
250       
251       # read user from config file
252       host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user", middleware_user)
253       if prompt_for_username and host[:login] != middleware_user
254           Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+
255                   "plugin.activemq.prompt_user should be set to false to remove the prompt.")
256       end
257       
258       # read user from config file
259       host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password", middleware_password)
260       if prompt_for_password and host[:passcode] != middleware_password
261           Log.info("Using password from config file to connect to #{host[:host]}. "+
262                   "plugin.activemq.prompt_password should be set to false to remove the prompt.")
263       end
264 
265       # if ssl is enabled set :ssl to the hash of parameters
266       if host[:ssl]
267         host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false"))
268       end
269 
270       Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
271       hosts << host
272     end
273 
274     raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0
275 
276     connection = {:hosts => hosts}
277 
278     # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
279     # these can be guessed, the documentation isn't clear
280     connection[:use_exponential_back_off] = @use_exponential_back_off
281     connection[:initial_reconnect_delay] = @initial_reconnect_delay
282     connection[:back_off_multiplier] = @back_off_multiplier
283     connection[:max_reconnect_delay] = @max_reconnect_delay
284     connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
285     connection[:randomize] = get_bool_option("activemq.randomize", "false")
286     connection[:backup] = get_bool_option("activemq.backup", "false")
287     connection[:timeout] = Integer(get_option("activemq.timeout", -1))
288     connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
289     connection[:reliable] = true
290     connection[:connect_headers] = connection_headers
291     connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 0))
292     connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))
293 
294     connection[:logger] = EventLogger.new
295 
296     @connection = connector.new(connection)
297 
298   rescue ClientTimeoutError => e
299     raise e
300   rescue Exception => e
301     raise("Could not connect to ActiveMQ Server: #{e}")
302   end
303 end
connection_headers() click to toggle source
    # File lib/mcollective/connector/activemq.rb
313 def connection_headers
314   headers = {:"accept-version" => "1.0"}
315 
316   heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
317   stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)
318 
319   headers[:host] = get_option("activemq.vhost", "mcollective")
320 
321   if heartbeat_interval > 0
322     unless stomp_version_supports_heartbeat?
323       raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
324     end
325 
326     if heartbeat_interval < 30
327       Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
328       heartbeat_interval = 30
329     end
330 
331     heartbeat_interval = heartbeat_interval * 1000
332     headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
333 
334     if stomp_1_0_fallback
335       headers[:"accept-version"] = "1.1,1.0"
336     else
337       headers[:"accept-version"] = "1.1"
338     end
339   else
340     if stomp_version_supports_heartbeat?
341       Log.info("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
342     end
343   end
344 
345   headers
346 end
disconnect() click to toggle source

Disconnects from the ActiveMQ connection

    # File lib/mcollective/connector/activemq.rb
506 def disconnect
507   Log.debug("Disconnecting from ActiveMQ")
508   @connection.disconnect
509   @connection = nil
510 end
exponential_back_off() click to toggle source

Calculate the exponential backoff needed

    # File lib/mcollective/connector/activemq.rb
399 def exponential_back_off
400   if !@use_exponential_back_off
401     return nil
402   end
403 
404   backoff = @reconnect_delay
405 
406   # calculate next delay
407   @reconnect_delay = @reconnect_delay * @back_off_multiplier
408 
409   # cap at max reconnect delay
410   if @reconnect_delay > @max_reconnect_delay
411     @reconnect_delay = @max_reconnect_delay
412   end
413 
414   return backoff
415 end
get_bool_option(val, default) click to toggle source

looks up a boolean value in the config

    # File lib/mcollective/connector/activemq.rb
602 def get_bool_option(val, default)
603   Util.str_to_bool(@config.pluginconf.fetch(val, default))
604 end
get_cert_file(poolnum) click to toggle source

Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.

    # File lib/mcollective/connector/activemq.rb
394 def get_cert_file(poolnum)
395   ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
396 end
get_env_or_option(env, opt, default=nil) click to toggle source

looks in the environment first then in the config file for a specific option, accepts an optional default.

raises an exception when it cant find a value anywhere

    # File lib/mcollective/connector/activemq.rb
583 def get_env_or_option(env, opt, default=nil)
584   return ENV[env] if ENV.include?(env)
585   return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
586   return default if default
587 
588   raise("No #{env} environment or plugin.#{opt} configuration option given")
589 end
get_key_file(poolnum) click to toggle source

Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.

    # File lib/mcollective/connector/activemq.rb
386 def get_key_file(poolnum)
387   ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
388 end
get_option(opt, default=nil) click to toggle source

looks for a config option, accepts an optional default

raises an exception when it cant find a value anywhere

    # File lib/mcollective/connector/activemq.rb
594 def get_option(opt, default=nil)
595   return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
596   return default unless default.nil?
597 
598   raise("No plugin.#{opt} configuration option given")
599 end
headers_for(msg, identity=nil) click to toggle source
    # File lib/mcollective/connector/activemq.rb
512 def headers_for(msg, identity=nil)
513   headers = {}
514 
515   headers = {"priority" => @msgpriority} if @msgpriority > 0
516 
517   headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s
518 
519   # set the expires header based on the TTL, we build a small additional
520   # timeout of 10 seconds in here to allow for network latency etc
521   headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s
522 
523   if [:request, :direct_request].include?(msg.type)
524     target = make_target(msg.agent, :reply, msg.collective)
525 
526     if msg.reply_to
527       headers["reply-to"] = msg.reply_to
528     else
529       headers["reply-to"] = target[:name]
530     end
531 
532     headers["mc_identity"] = identity if msg.type == :direct_request
533   end
534 
535   headers["mc_sender"] = Config.instance.identity
536 
537   return headers
538 end
make_target(agent, type, collective) click to toggle source
    # File lib/mcollective/connector/activemq.rb
540 def make_target(agent, type, collective)
541   raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
542   raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
543 
544   agents_multiplex = get_bool_option("activemq.agents_multiplex", "false")
545   target = {:name => nil, :headers => {}}
546 
547   case type
548     when :reply
549       target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}", Client.request_sequence].join(".")
550 
551     when :broadcast
552       if agents_multiplex
553         target[:name] = ["/topic/" + collective, :agents].join(".")
554       else
555         target[:name] = ["/topic/" + collective, agent, :agent].join(".")
556       end
557 
558     when :request
559       if agents_multiplex
560         target[:name] = ["/topic/" + collective, :agents].join(".")
561       else
562         target[:name] = ["/topic/" + collective, agent, :agent].join(".")
563       end
564 
565     when :direct_request
566       target[:name] = ["/queue/" + collective, :nodes].join(".")
567 
568     when :directed
569       target[:name] = ["/queue/" + collective, :nodes].join(".")
570       target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
571       target[:id] = "%s_directed_to_identity" % collective
572   end
573 
574   target[:id] = target[:name] unless target[:id]
575 
576   target
577 end
publish(msg) click to toggle source

Sends a message to the ActiveMQ connection

    # File lib/mcollective/connector/activemq.rb
449 def publish(msg)
450   msg.base64_encode! if @base64
451 
452   target = target_for(msg)
453 
454   if msg.type == :direct_request
455     msg.discovered_hosts.each do |node|
456       target[:headers] = headers_for(msg, node)
457 
458       Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
459 
460       @connection.publish(target[:name], msg.payload, target[:headers])
461     end
462   else
463     target[:headers].merge!(headers_for(msg))
464 
465     Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
466 
467     @connection.publish(target[:name], msg.payload, target[:headers])
468   end
469 end
receive() click to toggle source

Receives a message from the ActiveMQ connection

    # File lib/mcollective/connector/activemq.rb
418 def receive
419   Log.debug("Waiting for a message from ActiveMQ")
420 
421   # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
422   # handling it sets the connection to closed.  If we happen to be receiving at just
423   # that time we will get an exception warning about the closed connection so handling
424   # that here with a sleep and a retry.
425   begin
426     msg = @connection.receive
427   rescue ::Stomp::Error::NoCurrentConnection
428     sleep 1
429     retry
430   end
431 
432   # In older stomp gems an attempt to receive after failed authentication can return nil
433   if msg.nil?
434     raise MessageNotReceived.new(exponential_back_off), "No message received from ActiveMQ."
435 
436   end
437 
438   # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types
439   if msg.command != 'MESSAGE'
440     Log.warn("Unexpected '#{msg.command}' frame.  Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}")
441     raise UnexpectedMessageType.new(exponential_back_off),
442       "Received frame of type '#{msg.command}' expected 'MESSAGE'"
443   end
444 
445   Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
446 end
ssl_parameters(poolnum, fallback) click to toggle source

Sets the SSL paramaters for a specific connection

    # File lib/mcollective/connector/activemq.rb
349 def ssl_parameters(poolnum, fallback)
350   params = {
351     :cert_file => get_cert_file(poolnum),
352     :key_file  => get_key_file(poolnum),
353     :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false),
354     :ciphers   => get_option("activemq.pool.#{poolnum}.ssl.ciphers", false),
355   }
356 
357   raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
358 
359   raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
360   raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
361 
362   params[:ts_files].split(",").each do |ca|
363     raise "Cannot find CA file #{ca}" unless File.exist?(ca)
364   end
365 
366   begin
367     ::Stomp::SSLParams.new(params)
368   rescue NameError
369     raise "Stomp gem >= 1.2.2 is needed"
370   end
371 
372 rescue Exception => e
373   if fallback
374     Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
375     return true
376   else
377     Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
378     raise(e)
379   end
380 end
stomp_version() click to toggle source
    # File lib/mcollective/connector/activemq.rb
305 def stomp_version
306   ::Stomp::Version::STRING
307 end
stomp_version_supports_heartbeat?() click to toggle source
    # File lib/mcollective/connector/activemq.rb
309 def stomp_version_supports_heartbeat?
310   return Util.versioncmp(stomp_version, "1.2.10") >= 0
311 end
subscribe(agent, type, collective) click to toggle source

Subscribe to a topic or queue

    # File lib/mcollective/connector/activemq.rb
472 def subscribe(agent, type, collective)
473   source = make_target(agent, type, collective)
474 
475   unless @subscriptions.include?(source[:id])
476     Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
477     @connection.subscribe(source[:name], source[:headers], source[:id])
478     @subscriptions << source[:id]
479   end
480 rescue ::Stomp::Error::DuplicateSubscription
481   Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
482 end
target_for(msg) click to toggle source
    # File lib/mcollective/connector/activemq.rb
493 def target_for(msg)
494   if msg.type == :reply
495     target = {:name => msg.request.headers["reply-to"], :headers => {}}
496   elsif [:request, :direct_request].include?(msg.type)
497     target = make_target(msg.agent, msg.type, msg.collective)
498   else
499     raise "Don't now how to create a target for message type #{msg.type}"
500   end
501 
502   return target
503 end
unsubscribe(agent, type, collective) click to toggle source

UnSubscribe to a topic or queue

    # File lib/mcollective/connector/activemq.rb
485 def unsubscribe(agent, type, collective)
486   source = make_target(agent, type, collective)
487 
488   Log.debug("Unsubscribing from #{source[:name]}")
489   @connection.unsubscribe(source[:name], source[:headers], source[:id])
490   @subscriptions.delete(source[:id])
491 end