class MCollective::RPC::Client

The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.

Attributes

agent[R]
batch_mode[R]
batch_size[R]
batch_sleep_time[R]
client[R]
config[RW]
ddl[R]
default_discovery_method[R]
discovery_method[R]
discovery_options[R]
filter[RW]
limit_method[R]
limit_seed[R]
limit_targets[R]
output_format[R]
progress[RW]
reply_to[RW]
stats[R]
timeout[RW]
ttl[RW]
verbose[RW]

Public Class Methods

new(agent, flags = {}) { |parser, opts| ... } click to toggle source

Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.

rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)

You typically would not call this directly you'd use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin

    # File lib/mcollective/rpc/client.rb
 20 def initialize(agent, flags = {})
 21   if flags.include?(:options)
 22     initial_options = flags[:options]
 23 
 24   elsif @@initial_options
 25     initial_options = Marshal.load(@@initial_options)
 26 
 27   else
 28     oparser = MCollective::Optionparser.new({ :verbose => false,
 29                                               :progress_bar => true,
 30                                               :mcollective_limit_targets => false,
 31                                               :batch_size => nil,
 32                                               :batch_sleep_time => 1 },
 33                                             "filter")
 34 
 35     initial_options = oparser.parse do |parser, opts|
 36       if block_given?
 37         yield(parser, opts)
 38       end
 39 
 40       Helpers.add_simplerpc_options(parser, opts)
 41     end
 42 
 43     @@initial_options = Marshal.dump(initial_options)
 44   end
 45 
 46   @initial_options = initial_options
 47 
 48   @config = initial_options[:config]
 49   @client = MCollective::Client.new(@initial_options)
 50 
 51   @stats = Stats.new
 52   @agent = agent
 53   @timeout = initial_options[:timeout] || 5
 54   @verbose = initial_options[:verbose]
 55   @filter = initial_options[:filter] || Util.empty_filter
 56   @discovered_agents = nil
 57   @progress = initial_options[:progress_bar]
 58   @limit_targets = initial_options[:mcollective_limit_targets]
 59   @limit_method = Config.instance.rpclimitmethod
 60   @limit_seed = initial_options[:limit_seed] || nil
 61   @output_format = initial_options[:output_format] || :console
 62   @force_direct_request = false
 63   @reply_to = initial_options[:reply_to]
 64   @discovery_method = initial_options[:discovery_method]
 65   if !@discovery_method
 66     @discovery_method = Config.instance.default_discovery_method
 67     @default_discovery_method = true
 68   else
 69     @default_discovery_method = false
 70   end
 71   @discovery_options = initial_options[:discovery_options] || []
 72   @force_display_mode = initial_options[:force_display_mode] || false
 73 
 74   @batch_size = initial_options[:batch_size] || Config.instance.default_batch_size
 75   @batch_sleep_time = Float(initial_options[:batch_sleep_time] || Config.instance.default_batch_sleep_time)
 76   @batch_mode = determine_batch_mode(@batch_size)
 77 
 78   agent_filter agent
 79 
 80   @discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout
 81 
 82   @collective = @client.collective
 83   @ttl = initial_options[:ttl] || Config.instance.ttl
 84   @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
 85   @threaded = initial_options[:threaded] || Config.instance.threaded
 86 
 87   # if we can find a DDL for the service override
 88   # the timeout of the client so we always magically
 89   # wait appropriate amounts of time.
 90   #
 91   # We add the discovery timeout to the ddl supplied
 92   # timeout as the discovery timeout tends to be tuned
 93   # for local network conditions and fact source speed
 94   # which would other wise not be accounted for and
 95   # some results might get missed.
 96   #
 97   # We do this only if the timeout is the default 5
 98   # seconds, so that users cli overrides will still
 99   # get applied
100   #
101   # DDLs are required, failure to find a DDL is fatal
102   @ddl = DDL.new(agent)
103   @stats.ddl = @ddl
104   @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
105 
106   # allows stderr and stdout to be overridden for testing
107   # but also for web apps that might not want a bunch of stuff
108   # generated to actual file handles
109   if initial_options[:stderr]
110     @stderr = initial_options[:stderr]
111   else
112     @stderr = STDERR
113     @stderr.sync = true
114   end
115 
116   if initial_options[:stdout]
117     @stdout = initial_options[:stdout]
118   else
119     @stdout = STDOUT
120     @stdout.sync = true
121   end
122 
123   if initial_options[:stdin]
124     @stdin = initial_options[:stdin]
125   else
126     @stdin = STDIN
127   end
128 end

Public Instance Methods

agent_filter(agent) click to toggle source

Sets the agent filter

    # File lib/mcollective/rpc/client.rb
437 def agent_filter(agent)
438   @filter["agent"] = @filter["agent"] | [agent]
439   @filter["agent"].compact!
440   reset
441 end
aggregate_reply(reply, aggregate) click to toggle source
    # File lib/mcollective/rpc/client.rb
731 def aggregate_reply(reply, aggregate)
732   return nil unless aggregate
733 
734   aggregate.call_functions(reply)
735   return aggregate
736 rescue Exception => e
737   Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
738   return nil
739 end
batch_size=(limit) click to toggle source

Sets the batch size, if the size is set to 0 that will disable batch mode

    # File lib/mcollective/rpc/client.rb
648 def batch_size=(limit)
649   unless Config.instance.direct_addressing
650     raise "Can only set batch size if direct addressing is supported"
651   end
652 
653   validate_batch_size(limit)
654 
655   @batch_size = limit
656   @batch_mode = determine_batch_mode(@batch_size)
657 end
batch_sleep_time=(time) click to toggle source
    # File lib/mcollective/rpc/client.rb
659 def batch_sleep_time=(time)
660   raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
661 
662   @batch_sleep_time = Float(time)
663 end
call_agent(action, args, opts, disc=:auto, &block) click to toggle source

Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.

Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data

    # File lib/mcollective/rpc/client.rb
913 def call_agent(action, args, opts, disc=:auto, &block)
914   # Handle fire and forget requests and make sure
915   # the :process_results value is set appropriately
916   #
917   # specific reply-to requests should be treated like
918   # fire and forget since the client will never get
919   # the responses
920   if args[:process_results] == false || @reply_to
921     return fire_and_forget_request(action, args)
922   else
923     args[:process_results] = true
924   end
925 
926   # Do discovery when no specific discovery array is given
927   #
928   # If an array is given set the force_direct_request hint that
929   # will tell the message object to be a direct request one
930   if disc == :auto
931     discovered = discover
932   else
933     @force_direct_request = true if Config.instance.direct_addressing
934     discovered = disc
935   end
936 
937   req = new_request(action.to_s, args)
938 
939   message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
940   message.discovered_hosts = discovered.clone
941 
942   results = []
943   respcount = 0
944 
945   if discovered.size > 0
946     message.type = :direct_request if @force_direct_request
947 
948     if @progress && !block_given?
949       twirl = Progress.new
950       @stdout.puts
951       @stdout.print twirl.twirl(respcount, discovered.size)
952     end
953 
954     aggregate = load_aggregate_functions(action, @ddl)
955 
956     @client.req(message) do |resp|
957       respcount += 1
958 
959       if block_given?
960         aggregate = process_results_with_block(action, resp, block, aggregate)
961       else
962         @stdout.print twirl.twirl(respcount, discovered.size) if @progress
963 
964         result, aggregate = process_results_without_block(resp, action, aggregate)
965 
966         results << result
967       end
968     end
969 
970     if @initial_options[:sort]
971       results.sort!
972     end
973 
974     @stats.aggregate_summary = aggregate.summarize if aggregate
975     @stats.aggregate_failures = aggregate.failed if aggregate
976     @stats.client_stats = @client.stats
977   else
978     @stderr.print("\nNo request sent, we did not discover any nodes.")
979   end
980 
981   @stats.finish_request
982 
983   RPC.stats(@stats)
984 
985   @stdout.print("\n\n") if @progress
986 
987   if block_given?
988     return stats
989   else
990     return [results].flatten
991   end
992 end
call_agent_batched(action, args, opts, batch_size, sleep_time, &block) click to toggle source

Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.

The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.

This is used by method_missing and works only with direct addressing mode

    # File lib/mcollective/rpc/client.rb
808 def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
809   raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
810   raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
811   validate_batch_size(batch_size)
812 
813   sleep_time = Float(sleep_time)
814 
815   Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
816 
817   @force_direct_request = true
818 
819   discovered = discover
820   results = []
821   respcount = 0
822 
823   if discovered.size > 0
824     req = new_request(action.to_s, args)
825 
826     aggregate = load_aggregate_functions(action, @ddl)
827 
828     if @progress && !block_given?
829       twirl = Progress.new
830       @stdout.puts
831       @stdout.print twirl.twirl(respcount, discovered.size)
832     end
833 
834     if (batch_size =~ /^(\d+)%$/)
835       # determine batch_size as a percentage of the discovered array's size
836       batch_size = (discovered.size / 100.0 * Integer($1)).ceil
837     else
838       batch_size = Integer(batch_size)
839     end
840 
841     @stats.requestid = nil
842     processed_nodes = 0
843 
844     discovered.in_groups_of(batch_size) do |hosts|
845       message = Message.new(req, nil, {:agent => @agent,
846                                        :type => :direct_request,
847                                        :collective => @collective,
848                                        :filter => opts[:filter],
849                                        :options => opts})
850 
851       # first time round we let the Message object create a request id
852       # we then re-use it for future requests to keep auditing sane etc
853       @stats.requestid = message.create_reqid unless @stats.requestid
854       message.requestid = @stats.requestid
855 
856       message.discovered_hosts = hosts.clone.compact
857 
858       @client.req(message) do |resp|
859         respcount += 1
860 
861         if block_given?
862           aggregate = process_results_with_block(action, resp, block, aggregate)
863         else
864           @stdout.print twirl.twirl(respcount, discovered.size) if @progress
865 
866           result, aggregate = process_results_without_block(resp, action, aggregate)
867 
868           results << result
869         end
870       end
871 
872       if @initial_options[:sort]
873         results.sort!
874       end
875 
876       @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
877       @stats.unexpectedresponsefrom.concat @client.stats[:unexpectedresponsefrom]
878       @stats.responses += @client.stats[:responses]
879       @stats.blocktime += @client.stats[:blocktime] + sleep_time
880       @stats.totaltime += @client.stats[:totaltime]
881       @stats.discoverytime += @client.stats[:discoverytime]
882 
883       processed_nodes += hosts.length
884       if (discovered.length > processed_nodes)
885         sleep sleep_time
886       end
887     end
888 
889     @stats.aggregate_summary = aggregate.summarize if aggregate
890     @stats.aggregate_failures = aggregate.failed if aggregate
891   else
892     @stderr.print("\nNo request sent, we did not discover any nodes.")
893   end
894 
895   @stats.finish_request
896 
897   RPC.stats(@stats)
898 
899   @stdout.print("\n") if @progress
900 
901   if block_given?
902     return stats
903   else
904     return [results].flatten
905   end
906 end
class_filter(klass) click to toggle source

Sets the class filter

    # File lib/mcollective/rpc/client.rb
413 def class_filter(klass)
414   @filter["cf_class"] = @filter["cf_class"] | [klass]
415   @filter["cf_class"].compact!
416   reset
417 end
collective=(c) click to toggle source

Sets the collective we are communicating with

    # File lib/mcollective/rpc/client.rb
607 def collective=(c)
608   raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
609 
610   @collective = c
611   @client.options = options
612   reset
613 end
compound_filter(filter) click to toggle source

Set a compound filter

    # File lib/mcollective/rpc/client.rb
451 def compound_filter(filter)
452   @filter["compound"] = @filter["compound"] |  [Matcher.create_compound_callstack(filter)]
453   reset
454 end
custom_request(action, args, expected_agents, filter = {}, &block) click to toggle source

Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.

This will help you essentially short circuit the traditional cycle of:

mc discover / call / wait for discovered nodes

by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.

Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc

If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:

puppet.custom_request(“runonce”, {}, [“your.box.com”], {:identity => “your.box.com”})

This will do runonce action on just 'your.box.com', no discovery will be done and after receiving just one response it will stop waiting for responses

If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise

    # File lib/mcollective/rpc/client.rb
311 def custom_request(action, args, expected_agents, filter = {}, &block)
312   validate_request(action, args)
313 
314   if filter == {} && !Config.instance.direct_addressing
315     raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
316   end
317 
318   @stats.reset
319 
320   custom_filter = Util.empty_filter
321   custom_options = options.clone
322 
323   # merge the supplied filter with the standard empty one
324   # we could just use the merge method but I want to be sure
325   # we dont merge in stuff that isnt actually valid
326   ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
327     if filter.include?(ftype)
328       custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
329     end
330   end
331 
332   # ensure that all filters at least restrict the call to the agent we're a proxy for
333   custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
334   custom_options[:filter] = custom_filter
335 
336   # Fake out the stats discovery would have put there
337   @stats.discovered_agents([expected_agents].flatten)
338 
339   # Handle fire and forget requests
340   #
341   # If a specific reply-to was set then from the client perspective this should
342   # be a fire and forget request too since no response will ever reach us - it
343   # will go to the reply-to destination
344   if args[:process_results] == false || @reply_to
345     return fire_and_forget_request(action, args, custom_filter)
346   end
347 
348   # Now do a call pretty much exactly like in method_missing except with our own
349   # options and discovery magic
350   if block_given?
351     call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
352       block.call(r)
353     end
354   else
355     call_agent(action, args, custom_options, [expected_agents].flatten)
356   end
357 end
detect_and_set_stdin_discovery() click to toggle source

Detects data on STDIN and sets the STDIN discovery method

IF the discovery method hasn't been explicitly overridden

and we're not being run interactively,
and someone has piped us some data

Then we assume it's a discovery list - this can be either:

- list of hosts in plaintext
- JSON that came from another rpc or printrpc

Then we override discovery to try to grok the data on STDIN

    # File lib/mcollective/rpc/client.rb
479 def detect_and_set_stdin_discovery
480   if self.default_discovery_method && !@stdin.tty? && !@stdin.eof?
481     self.discovery_method = 'stdin'
482     self.discovery_options = 'auto'
483   end
484 end
disconnect() click to toggle source

Disconnects cleanly from the middleware

    # File lib/mcollective/rpc/client.rb
131 def disconnect
132   @client.disconnect
133 end
discover(flags={}) click to toggle source

Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.

Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery

Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json

Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.

Use reset to force a new discovery

    # File lib/mcollective/rpc/client.rb
501 def discover(flags={})
502   flags.keys.each do |key|
503     raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
504   end
505 
506   flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
507 
508   verbose = false unless @output_format == :console
509 
510   # flags[:nodes] and flags[:hosts] are the same thing, we should never have
511   # allowed :hosts as that was inconsistent with the established terminology
512   flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
513 
514   reset if flags[:nodes] || flags[:json]
515 
516   unless @discovered_agents
517     # if either hosts or JSON is supplied try to figure out discovery data from there
518     # if direct_addressing is not enabled this is a critical error as the user might
519     # not have supplied filters so raise an exception
520     if flags[:nodes] || flags[:json]
521       raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
522 
523       hosts = []
524 
525       if flags[:nodes]
526         hosts = Helpers.extract_hosts_from_array(flags[:nodes])
527       elsif flags[:json]
528         hosts = Helpers.extract_hosts_from_json(flags[:json])
529       end
530 
531       raise "Could not find any hosts in discovery data provided" if hosts.empty?
532 
533       @discovered_agents = hosts
534       @force_direct_request = true
535 
536     else
537       identity_filter_discovery_optimization
538     end
539   end
540 
541   # All else fails we do it the hard way using a traditional broadcast
542   unless @discovered_agents
543     @stats.time_discovery :start
544 
545     @client.options = options
546 
547     # if compound filters are used the only real option is to use the mc
548     # discovery plugin since its the only capable of using data queries etc
549     # and we do not want to degrade that experience just to allow compounds
550     # on other discovery plugins the UX would be too bad raising complex sets
551     # of errors etc.
552     @client.discoverer.force_discovery_method_by_filter(options[:filter])
553 
554     if verbose
555       actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
556 
557       if actual_timeout > 0
558         @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
559       else
560         @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
561       end
562     end
563 
564     # if the requested limit is a pure number and not a percent
565     # and if we're configured to use the first found hosts as the
566     # limit method then pass in the limit thus minimizing the amount
567     # of work we do in the discover phase and speeding it up significantly
568     filter = @filter.merge({'collective' => @collective})
569     if @limit_method == :first and @limit_targets.is_a?(Integer)
570       @discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets)
571     else
572       @discovered_agents = @client.discover(filter, discovery_timeout)
573     end
574 
575     @stderr.puts(@discovered_agents.size) if verbose
576 
577     @force_direct_request = @client.discoverer.force_direct_mode?
578 
579     @stats.time_discovery :end
580   end
581 
582   @stats.discovered_agents(@discovered_agents)
583   RPC.discovered(@discovered_agents)
584 
585   @discovered_agents
586 end
discovery_method=(method) click to toggle source

Sets the discovery method. If we change the method there are a number of steps to take:

- set the new method
- if discovery options were provided, re-set those to initially
  provided ones else clear them as they might now apply to a
  different provider
- update the client options so it knows there is a new discovery
  method in force
- reset discovery data forcing a discover on the next request

The remaining item is the discovery timeout, we leave that as is since that is the user supplied timeout either via initial options or via specifically setting it on the client.

    # File lib/mcollective/rpc/client.rb
392 def discovery_method=(method)
393   @default_discovery_method = false
394   @discovery_method = method
395 
396   if @initial_options[:discovery_options]
397     @discovery_options = @initial_options[:discovery_options]
398   else
399     @discovery_options.clear
400   end
401 
402   @client.options = options
403 
404   reset
405 end
discovery_options=(options) click to toggle source
    # File lib/mcollective/rpc/client.rb
407 def discovery_options=(options)
408   @discovery_options = [options].flatten
409   reset
410 end
discovery_timeout() click to toggle source
    # File lib/mcollective/rpc/client.rb
359 def discovery_timeout
360   return @discovery_timeout if @discovery_timeout
361   return @client.discoverer.ddl.meta[:timeout]
362 end
discovery_timeout=(timeout) click to toggle source
    # File lib/mcollective/rpc/client.rb
364 def discovery_timeout=(timeout)
365   @discovery_timeout = Float(timeout)
366 
367   # we calculate the overall timeout from the DDL of the agent and
368   # the supplied discovery timeout unless someone specifically
369   # specifies a timeout to the constructor
370   #
371   # But if we also then specifically set a discovery_timeout on the
372   # agent that has to override the supplied timeout so we then
373   # calculate a correct timeout based on DDL timeout and the
374   # supplied discovery timeout
375   @timeout = @ddl.meta[:timeout] + discovery_timeout
376 end
fact_filter(fact, value=nil, operator="=") click to toggle source

Sets the fact filter

    # File lib/mcollective/rpc/client.rb
420 def fact_filter(fact, value=nil, operator="=")
421   return if fact.nil?
422   return if fact == false
423 
424   if value.nil?
425     parsed = Util.parse_fact_string(fact)
426     @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
427   else
428     parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
429     @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
430   end
431 
432   @filter["fact"].compact!
433   reset
434 end
fire_and_forget_request(action, args, filter=nil) click to toggle source

for requests that do not care for results just return the request id and don't do any of the response processing.

We send the :process_results flag with to the nodes so they can make decisions based on that.

Should only be called via method_missing

    # File lib/mcollective/rpc/client.rb
759 def fire_and_forget_request(action, args, filter=nil)
760   validate_request(action, args)
761 
762   identity_filter_discovery_optimization
763 
764   req = new_request(action.to_s, args)
765 
766   filter = options[:filter] unless filter
767 
768   message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
769   message.reply_to = @reply_to if @reply_to
770 
771   if @force_direct_request || @client.discoverer.force_direct_mode?
772     message.discovered_hosts = discover.clone
773     message.type = :direct_request
774   end
775 
776   client.sendreq(message, nil)
777 end
help(template) click to toggle source

Returns help for an agent if a DDL was found

    # File lib/mcollective/rpc/client.rb
136 def help(template)
137   @ddl.help(template)
138 end
identity_filter(identity) click to toggle source

Sets the identity filter

    # File lib/mcollective/rpc/client.rb
444 def identity_filter(identity)
445   @filter["identity"] = @filter["identity"] | [identity]
446   @filter["identity"].compact!
447   reset
448 end
identity_filter_discovery_optimization() click to toggle source

if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I's on the CLI

For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.

This is only needed for the 'mc' discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them

    # File lib/mcollective/rpc/client.rb
790 def identity_filter_discovery_optimization
791   if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
792     regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
793 
794     if regex_filters == 0
795       @discovered_agents = options[:filter]["identity"].clone
796       @force_direct_request = true if Config.instance.direct_addressing
797     end
798   end
799 end
limit_method=(method) click to toggle source

Sets and sanity check the limit_method variable used to determine how to limit targets if limit_targets is set

    # File lib/mcollective/rpc/client.rb
639 def limit_method=(method)
640   method = method.to_sym unless method.is_a?(Symbol)
641 
642   raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
643 
644   @limit_method = method
645 end
limit_targets=(limit) click to toggle source

Sets and sanity checks the limit_targets variable used to restrict how many nodes we'll target Limit targets can be reset by passing nil or false

    # File lib/mcollective/rpc/client.rb
618 def limit_targets=(limit)
619   if !limit
620     @limit_targets = nil
621     return
622   end
623 
624   if limit.is_a?(String)
625     raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
626 
627     begin
628       @limit_targets = Integer(limit)
629     rescue
630       @limit_targets = limit
631     end
632   else
633     @limit_targets = Integer(limit)
634   end
635 end
load_aggregate_functions(action, ddl) click to toggle source
    # File lib/mcollective/rpc/client.rb
720 def load_aggregate_functions(action, ddl)
721   return nil unless ddl
722   return nil unless ddl.action_interface(action).keys.include?(:aggregate)
723 
724   return Aggregate.new(ddl.action_interface(action))
725 
726 rescue => e
727   Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
728   return nil
729 end
method_missing(method_name, *args, &block) click to toggle source

Magic handler to invoke remote methods

Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:

ret = rpc.echo(:msg => "hello world")

This will call the 'echo' action of the 'rpctest' agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:

{

:sender => "remote.box.com",
:statuscode => 0,
:statusmsg => "OK",
:data => "hello world"

}

If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc but the request could not be completed, you'll find a human parsable reason in :statusmsg then.

Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.

To get access to the full result of the MCollective::Client#req calls you can pass in a block:

rpc.echo(:msg => "hello world") do |resp|
   pp resp
end

In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you'll also need to handle the following exceptions:

UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running

During calls a progress indicator will be shown of how many results we've received against how many nodes were discovered, you can disable this by setting progress to false:

rpc.progress = false

This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It's a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.

You can invoke this using:

puts rpc.echo(:process_results => false)

This will output just the request id.

Batched processing is supported:

printrpc rpc.ping(:batch_size => 5)

This will do everything exactly as normal but communicate to only 5 agents at a time

    # File lib/mcollective/rpc/client.rb
241 def method_missing(method_name, *args, &block)
242   # set args to an empty hash if nothings given
243   args = args[0]
244   args = {} if args.nil?
245 
246   action = method_name.to_s
247 
248   @stats.reset
249 
250   validate_request(action, args)
251 
252   # TODO(ploubser): The logic here seems poor. It implies that it is valid to
253   # pass arguments where batch_mode is set to false and batch_mode > 0.
254   # If this is the case we completely ignore the supplied value of batch_mode
255   # and do our own thing.
256 
257   # if a global batch size is set just use that else set it
258   # in the case that it was passed as an argument
259   batch_mode = args.include?(:batch_size) || @batch_mode
260   batch_size = args.delete(:batch_size) || @batch_size
261   batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
262 
263   # if we were given a batch_size argument thats 0 and batch_mode was
264   # determined to be on via global options etc this will allow a batch_size
265   # of 0 to disable or batch_mode for this call only
266   batch_mode = determine_batch_mode(batch_size)
267 
268   # Handle single target requests by doing discovery and picking
269   # a random node.  Then do a custom request specifying a filter
270   # that will only match the one node.
271   if @limit_targets
272     target_nodes = pick_nodes_from_discovered(@limit_targets)
273     Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
274 
275     custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
276   elsif batch_mode
277     call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
278   else
279     call_agent(action, args, options, :auto, &block)
280   end
281 end
new_request(action, data) click to toggle source

Creates a suitable request hash for the SimpleRPC agent.

You'd use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn't care for responses.

In that case you can just do:

msg = your_rpc.new_request("some_action", :foo => :bar)
filter = your_rpc.filter

your_rpc.client.sendreq(msg, msg[:agent], filter)

This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not

Clearly the use of this technique should be limited and done only if your code requires such a thing

    # File lib/mcollective/rpc/client.rb
159 def new_request(action, data)
160   callerid = PluginManager["security_plugin"].callerid
161 
162   raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
163 
164   {:agent  => @agent,
165    :action => action,
166    :caller => callerid,
167    :data   => data}
168 end
options() click to toggle source

Provides a normal options hash like you would get from Optionparser

    # File lib/mcollective/rpc/client.rb
590 def options
591   {:disctimeout => discovery_timeout,
592    :timeout => @timeout,
593    :verbose => @verbose,
594    :filter => @filter,
595    :collective => @collective,
596    :output_format => @output_format,
597    :ttl => @ttl,
598    :discovery_method => @discovery_method,
599    :discovery_options => @discovery_options,
600    :force_display_mode => @force_display_mode,
601    :config => @config,
602    :publish_timeout => @publish_timeout,
603    :threaded => @threaded}
604 end
pick_nodes_from_discovered(count) click to toggle source

Pick a number of nodes from the discovered nodes

The count should be a string that can be either just a number or a percentage like 10%

It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else

- :first would be a simple way to do a distance based
  selection
- anything else will just pick one at random
- if random chosen, and batch-seed set, then set srand
  for the generator, and reset afterwards
    # File lib/mcollective/rpc/client.rb
679 def pick_nodes_from_discovered(count)
680   if count =~ /%$/
681     pct = Integer((discover.size * (count.to_f / 100)))
682     pct == 0 ? count = 1 : count = pct
683   else
684     count = Integer(count)
685   end
686 
687   return discover if discover.size <= count
688 
689   result = []
690 
691   if @limit_method == :first
692     return discover[0, count]
693   else
694     # we delete from the discovered list because we want
695     # to be sure there is no chance that the same node will
696     # be randomly picked twice.  So we have to clone the
697     # discovered list else this method will only ever work
698     # once per discovery cycle and not actually return the
699     # right nodes.
700     haystack = discover.clone
701 
702     if @limit_seed
703       haystack.sort!
704       srand(@limit_seed)
705     end
706 
707     count.times do
708       rnd = rand(haystack.size)
709       result << haystack.delete_at(rnd)
710     end
711 
712     # Reset random number generator to fresh seed
713     # As our seed from options is most likely short
714     srand if @limit_seed
715   end
716 
717   [result].flatten
718 end
process_results_with_block(action, resp, block, aggregate) click to toggle source

process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data

     # File lib/mcollective/rpc/client.rb
1017 def process_results_with_block(action, resp, block, aggregate)
1018   @stats.node_responded(resp[:senderid])
1019 
1020   result = rpc_result_from_reply(@agent, action, resp)
1021   aggregate = aggregate_reply(result, aggregate) if aggregate
1022 
1023   @stats.ok if result[:statuscode] == 0
1024   @stats.fail if result[:statuscode] != 0
1025   @stats.time_block_execution :start
1026 
1027   case block.arity
1028     when 1
1029       block.call(resp)
1030     when 2
1031       block.call(resp, result)
1032   end
1033 
1034   @stats.time_block_execution :end
1035 
1036   return aggregate
1037 end
process_results_without_block(resp, action, aggregate) click to toggle source

Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller

     # File lib/mcollective/rpc/client.rb
 997 def process_results_without_block(resp, action, aggregate)
 998   @stats.node_responded(resp[:senderid])
 999 
1000   result = rpc_result_from_reply(@agent, action, resp)
1001   aggregate = aggregate_reply(result, aggregate) if aggregate
1002 
1003   if result[:statuscode] == 0 || result[:statuscode] == 1
1004     @stats.ok if result[:statuscode] == 0
1005     @stats.fail if result[:statuscode] == 1
1006   else
1007     @stats.fail
1008   end
1009 
1010   [result, aggregate]
1011 end
reset() click to toggle source

Resets various internal parts of the class, most importantly it clears out the cached discovery

    # File lib/mcollective/rpc/client.rb
458 def reset
459   @discovered_agents = nil
460 end
reset_filter() click to toggle source

Reet the filter to an empty one

    # File lib/mcollective/rpc/client.rb
463 def reset_filter
464   @filter = Util.empty_filter
465   agent_filter @agent
466 end
rpc_result_from_reply(agent, action, reply) click to toggle source
    # File lib/mcollective/rpc/client.rb
741 def rpc_result_from_reply(agent, action, reply)
742   senderid = reply.include?("senderid") ? reply["senderid"] : reply[:senderid]
743   body = reply.include?("body") ? reply["body"] : reply[:body]
744   s_code = body.include?("statuscode") ? body["statuscode"] : body[:statuscode]
745   s_msg = body.include?("statusmsg") ? body["statusmsg"] : body[:statusmsg]
746   data = body.include?("data") ? body["data"] : body[:data]
747 
748   Result.new(agent, action, {:sender => senderid, :statuscode => s_code, :statusmsg => s_msg, :data => data})
749 end
validate_request(action, args) click to toggle source

For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request

We then pass the modified arguments to the DDL for validation

    # File lib/mcollective/rpc/client.rb
175 def validate_request(action, args)
176   raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
177 
178   @ddl.set_default_input_arguments(action, args)
179   @ddl.validate_rpc_request(action, args)
180 end