class MCollective::RPC::Client
The main component of the Simple RPC
client system, this wraps around MCollective::Client
and just brings in a lot of convention and standard approached.
Attributes
Public Class Methods
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
You typically would not call this directly you'd use MCollective::RPC#rpcclient
instead which is a wrapper around this that can be used as a Mixin
# File lib/mcollective/rpc/client.rb 20 def initialize(agent, flags = {}) 21 if flags.include?(:options) 22 initial_options = flags[:options] 23 24 elsif @@initial_options 25 initial_options = Marshal.load(@@initial_options) 26 27 else 28 oparser = MCollective::Optionparser.new({ :verbose => false, 29 :progress_bar => true, 30 :mcollective_limit_targets => false, 31 :batch_size => nil, 32 :batch_sleep_time => 1 }, 33 "filter") 34 35 initial_options = oparser.parse do |parser, opts| 36 if block_given? 37 yield(parser, opts) 38 end 39 40 Helpers.add_simplerpc_options(parser, opts) 41 end 42 43 @@initial_options = Marshal.dump(initial_options) 44 end 45 46 @initial_options = initial_options 47 48 @config = initial_options[:config] 49 @client = MCollective::Client.new(@initial_options) 50 51 @stats = Stats.new 52 @agent = agent 53 @timeout = initial_options[:timeout] || 5 54 @verbose = initial_options[:verbose] 55 @filter = initial_options[:filter] || Util.empty_filter 56 @discovered_agents = nil 57 @progress = initial_options[:progress_bar] 58 @limit_targets = initial_options[:mcollective_limit_targets] 59 @limit_method = Config.instance.rpclimitmethod 60 @limit_seed = initial_options[:limit_seed] || nil 61 @output_format = initial_options[:output_format] || :console 62 @force_direct_request = false 63 @reply_to = initial_options[:reply_to] 64 @discovery_method = initial_options[:discovery_method] 65 if !@discovery_method 66 @discovery_method = Config.instance.default_discovery_method 67 @default_discovery_method = true 68 else 69 @default_discovery_method = false 70 end 71 @discovery_options = initial_options[:discovery_options] || [] 72 @force_display_mode = initial_options[:force_display_mode] || false 73 74 @batch_size = initial_options[:batch_size] || Config.instance.default_batch_size 75 @batch_sleep_time = Float(initial_options[:batch_sleep_time] || Config.instance.default_batch_sleep_time) 76 @batch_mode = determine_batch_mode(@batch_size) 77 78 agent_filter agent 79 80 @discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout 81 82 @collective = @client.collective 83 @ttl = initial_options[:ttl] || Config.instance.ttl 84 @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout 85 @threaded = initial_options[:threaded] || Config.instance.threaded 86 87 # if we can find a DDL for the service override 88 # the timeout of the client so we always magically 89 # wait appropriate amounts of time. 90 # 91 # We add the discovery timeout to the ddl supplied 92 # timeout as the discovery timeout tends to be tuned 93 # for local network conditions and fact source speed 94 # which would other wise not be accounted for and 95 # some results might get missed. 96 # 97 # We do this only if the timeout is the default 5 98 # seconds, so that users cli overrides will still 99 # get applied 100 # 101 # DDLs are required, failure to find a DDL is fatal 102 @ddl = DDL.new(agent) 103 @stats.ddl = @ddl 104 @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5 105 106 # allows stderr and stdout to be overridden for testing 107 # but also for web apps that might not want a bunch of stuff 108 # generated to actual file handles 109 if initial_options[:stderr] 110 @stderr = initial_options[:stderr] 111 else 112 @stderr = STDERR 113 @stderr.sync = true 114 end 115 116 if initial_options[:stdout] 117 @stdout = initial_options[:stdout] 118 else 119 @stdout = STDOUT 120 @stdout.sync = true 121 end 122 123 if initial_options[:stdin] 124 @stdin = initial_options[:stdin] 125 else 126 @stdin = STDIN 127 end 128 end
Public Instance Methods
Sets the agent filter
# File lib/mcollective/rpc/client.rb 437 def agent_filter(agent) 438 @filter["agent"] = @filter["agent"] | [agent] 439 @filter["agent"].compact! 440 reset 441 end
# File lib/mcollective/rpc/client.rb 731 def aggregate_reply(reply, aggregate) 732 return nil unless aggregate 733 734 aggregate.call_functions(reply) 735 return aggregate 736 rescue Exception => e 737 Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class]) 738 return nil 739 end
Sets the batch size, if the size is set to 0 that will disable batch mode
# File lib/mcollective/rpc/client.rb 648 def batch_size=(limit) 649 unless Config.instance.direct_addressing 650 raise "Can only set batch size if direct addressing is supported" 651 end 652 653 validate_batch_size(limit) 654 655 @batch_size = limit 656 @batch_mode = determine_batch_mode(@batch_size) 657 end
# File lib/mcollective/rpc/client.rb 659 def batch_sleep_time=(time) 660 raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing 661 662 @batch_sleep_time = Float(time) 663 end
Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.
Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data
# File lib/mcollective/rpc/client.rb 913 def call_agent(action, args, opts, disc=:auto, &block) 914 # Handle fire and forget requests and make sure 915 # the :process_results value is set appropriately 916 # 917 # specific reply-to requests should be treated like 918 # fire and forget since the client will never get 919 # the responses 920 if args[:process_results] == false || @reply_to 921 return fire_and_forget_request(action, args) 922 else 923 args[:process_results] = true 924 end 925 926 # Do discovery when no specific discovery array is given 927 # 928 # If an array is given set the force_direct_request hint that 929 # will tell the message object to be a direct request one 930 if disc == :auto 931 discovered = discover 932 else 933 @force_direct_request = true if Config.instance.direct_addressing 934 discovered = disc 935 end 936 937 req = new_request(action.to_s, args) 938 939 message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts}) 940 message.discovered_hosts = discovered.clone 941 942 results = [] 943 respcount = 0 944 945 if discovered.size > 0 946 message.type = :direct_request if @force_direct_request 947 948 if @progress && !block_given? 949 twirl = Progress.new 950 @stdout.puts 951 @stdout.print twirl.twirl(respcount, discovered.size) 952 end 953 954 aggregate = load_aggregate_functions(action, @ddl) 955 956 @client.req(message) do |resp| 957 respcount += 1 958 959 if block_given? 960 aggregate = process_results_with_block(action, resp, block, aggregate) 961 else 962 @stdout.print twirl.twirl(respcount, discovered.size) if @progress 963 964 result, aggregate = process_results_without_block(resp, action, aggregate) 965 966 results << result 967 end 968 end 969 970 if @initial_options[:sort] 971 results.sort! 972 end 973 974 @stats.aggregate_summary = aggregate.summarize if aggregate 975 @stats.aggregate_failures = aggregate.failed if aggregate 976 @stats.client_stats = @client.stats 977 else 978 @stderr.print("\nNo request sent, we did not discover any nodes.") 979 end 980 981 @stats.finish_request 982 983 RPC.stats(@stats) 984 985 @stdout.print("\n\n") if @progress 986 987 if block_given? 988 return stats 989 else 990 return [results].flatten 991 end 992 end
Calls an agent in a way very similar to call_agent
but it supports batching the queries to the network.
The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.
This is used by method_missing
and works only with direct addressing mode
# File lib/mcollective/rpc/client.rb 808 def call_agent_batched(action, args, opts, batch_size, sleep_time, &block) 809 raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing 810 raise "Cannot bypass result processing for batched requests" if args[:process_results] == false 811 validate_batch_size(batch_size) 812 813 sleep_time = Float(sleep_time) 814 815 Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}") 816 817 @force_direct_request = true 818 819 discovered = discover 820 results = [] 821 respcount = 0 822 823 if discovered.size > 0 824 req = new_request(action.to_s, args) 825 826 aggregate = load_aggregate_functions(action, @ddl) 827 828 if @progress && !block_given? 829 twirl = Progress.new 830 @stdout.puts 831 @stdout.print twirl.twirl(respcount, discovered.size) 832 end 833 834 if (batch_size =~ /^(\d+)%$/) 835 # determine batch_size as a percentage of the discovered array's size 836 batch_size = (discovered.size / 100.0 * Integer($1)).ceil 837 else 838 batch_size = Integer(batch_size) 839 end 840 841 @stats.requestid = nil 842 processed_nodes = 0 843 844 discovered.in_groups_of(batch_size) do |hosts| 845 message = Message.new(req, nil, {:agent => @agent, 846 :type => :direct_request, 847 :collective => @collective, 848 :filter => opts[:filter], 849 :options => opts}) 850 851 # first time round we let the Message object create a request id 852 # we then re-use it for future requests to keep auditing sane etc 853 @stats.requestid = message.create_reqid unless @stats.requestid 854 message.requestid = @stats.requestid 855 856 message.discovered_hosts = hosts.clone.compact 857 858 @client.req(message) do |resp| 859 respcount += 1 860 861 if block_given? 862 aggregate = process_results_with_block(action, resp, block, aggregate) 863 else 864 @stdout.print twirl.twirl(respcount, discovered.size) if @progress 865 866 result, aggregate = process_results_without_block(resp, action, aggregate) 867 868 results << result 869 end 870 end 871 872 if @initial_options[:sort] 873 results.sort! 874 end 875 876 @stats.noresponsefrom.concat @client.stats[:noresponsefrom] 877 @stats.unexpectedresponsefrom.concat @client.stats[:unexpectedresponsefrom] 878 @stats.responses += @client.stats[:responses] 879 @stats.blocktime += @client.stats[:blocktime] + sleep_time 880 @stats.totaltime += @client.stats[:totaltime] 881 @stats.discoverytime += @client.stats[:discoverytime] 882 883 processed_nodes += hosts.length 884 if (discovered.length > processed_nodes) 885 sleep sleep_time 886 end 887 end 888 889 @stats.aggregate_summary = aggregate.summarize if aggregate 890 @stats.aggregate_failures = aggregate.failed if aggregate 891 else 892 @stderr.print("\nNo request sent, we did not discover any nodes.") 893 end 894 895 @stats.finish_request 896 897 RPC.stats(@stats) 898 899 @stdout.print("\n") if @progress 900 901 if block_given? 902 return stats 903 else 904 return [results].flatten 905 end 906 end
Sets the class filter
# File lib/mcollective/rpc/client.rb 413 def class_filter(klass) 414 @filter["cf_class"] = @filter["cf_class"] | [klass] 415 @filter["cf_class"].compact! 416 reset 417 end
Sets the collective we are communicating with
# File lib/mcollective/rpc/client.rb 607 def collective=(c) 608 raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c) 609 610 @collective = c 611 @client.options = options 612 reset 613 end
Set a compound filter
# File lib/mcollective/rpc/client.rb 451 def compound_filter(filter) 452 @filter["compound"] = @filter["compound"] | [Matcher.create_compound_callstack(filter)] 453 reset 454 end
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
This will help you essentially short circuit the traditional cycle of:
mc discover / call / wait for discovered nodes
by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.
Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc
If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:
puppet.custom_request(“runonce”, {}, [“your.box.com”], {:identity => “your.box.com”})
This will do runonce action on just 'your.box.com', no discovery will be done and after receiving just one response it will stop waiting for responses
If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise
# File lib/mcollective/rpc/client.rb 311 def custom_request(action, args, expected_agents, filter = {}, &block) 312 validate_request(action, args) 313 314 if filter == {} && !Config.instance.direct_addressing 315 raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes" 316 end 317 318 @stats.reset 319 320 custom_filter = Util.empty_filter 321 custom_options = options.clone 322 323 # merge the supplied filter with the standard empty one 324 # we could just use the merge method but I want to be sure 325 # we dont merge in stuff that isnt actually valid 326 ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype| 327 if filter.include?(ftype) 328 custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten 329 end 330 end 331 332 # ensure that all filters at least restrict the call to the agent we're a proxy for 333 custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent) 334 custom_options[:filter] = custom_filter 335 336 # Fake out the stats discovery would have put there 337 @stats.discovered_agents([expected_agents].flatten) 338 339 # Handle fire and forget requests 340 # 341 # If a specific reply-to was set then from the client perspective this should 342 # be a fire and forget request too since no response will ever reach us - it 343 # will go to the reply-to destination 344 if args[:process_results] == false || @reply_to 345 return fire_and_forget_request(action, args, custom_filter) 346 end 347 348 # Now do a call pretty much exactly like in method_missing except with our own 349 # options and discovery magic 350 if block_given? 351 call_agent(action, args, custom_options, [expected_agents].flatten) do |r| 352 block.call(r) 353 end 354 else 355 call_agent(action, args, custom_options, [expected_agents].flatten) 356 end 357 end
Detects data on STDIN and sets the STDIN discovery method
IF the discovery method hasn't been explicitly overridden
and we're not being run interactively, and someone has piped us some data
Then we assume it's a discovery list - this can be either:
- list of hosts in plaintext - JSON that came from another rpc or printrpc
Then we override discovery to try to grok the data on STDIN
# File lib/mcollective/rpc/client.rb 479 def detect_and_set_stdin_discovery 480 if self.default_discovery_method && !@stdin.tty? && !@stdin.eof? 481 self.discovery_method = 'stdin' 482 self.discovery_options = 'auto' 483 end 484 end
Disconnects cleanly from the middleware
# File lib/mcollective/rpc/client.rb 131 def disconnect 132 @client.disconnect 133 end
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery
Discovery
can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC
JSON output using :json
Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.
Use reset to force a new discovery
# File lib/mcollective/rpc/client.rb 501 def discover(flags={}) 502 flags.keys.each do |key| 503 raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key) 504 end 505 506 flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose 507 508 verbose = false unless @output_format == :console 509 510 # flags[:nodes] and flags[:hosts] are the same thing, we should never have 511 # allowed :hosts as that was inconsistent with the established terminology 512 flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts) 513 514 reset if flags[:nodes] || flags[:json] 515 516 unless @discovered_agents 517 # if either hosts or JSON is supplied try to figure out discovery data from there 518 # if direct_addressing is not enabled this is a critical error as the user might 519 # not have supplied filters so raise an exception 520 if flags[:nodes] || flags[:json] 521 raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing 522 523 hosts = [] 524 525 if flags[:nodes] 526 hosts = Helpers.extract_hosts_from_array(flags[:nodes]) 527 elsif flags[:json] 528 hosts = Helpers.extract_hosts_from_json(flags[:json]) 529 end 530 531 raise "Could not find any hosts in discovery data provided" if hosts.empty? 532 533 @discovered_agents = hosts 534 @force_direct_request = true 535 536 else 537 identity_filter_discovery_optimization 538 end 539 end 540 541 # All else fails we do it the hard way using a traditional broadcast 542 unless @discovered_agents 543 @stats.time_discovery :start 544 545 @client.options = options 546 547 # if compound filters are used the only real option is to use the mc 548 # discovery plugin since its the only capable of using data queries etc 549 # and we do not want to degrade that experience just to allow compounds 550 # on other discovery plugins the UX would be too bad raising complex sets 551 # of errors etc. 552 @client.discoverer.force_discovery_method_by_filter(options[:filter]) 553 554 if verbose 555 actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter]) 556 557 if actual_timeout > 0 558 @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout]) 559 else 560 @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method]) 561 end 562 end 563 564 # if the requested limit is a pure number and not a percent 565 # and if we're configured to use the first found hosts as the 566 # limit method then pass in the limit thus minimizing the amount 567 # of work we do in the discover phase and speeding it up significantly 568 filter = @filter.merge({'collective' => @collective}) 569 if @limit_method == :first and @limit_targets.is_a?(Integer) 570 @discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets) 571 else 572 @discovered_agents = @client.discover(filter, discovery_timeout) 573 end 574 575 @stderr.puts(@discovered_agents.size) if verbose 576 577 @force_direct_request = @client.discoverer.force_direct_mode? 578 579 @stats.time_discovery :end 580 end 581 582 @stats.discovered_agents(@discovered_agents) 583 RPC.discovered(@discovered_agents) 584 585 @discovered_agents 586 end
Sets the discovery method. If we change the method there are a number of steps to take:
- set the new method - if discovery options were provided, re-set those to initially provided ones else clear them as they might now apply to a different provider - update the client options so it knows there is a new discovery method in force - reset discovery data forcing a discover on the next request
The remaining item is the discovery timeout, we leave that as is since that is the user supplied timeout either via initial options or via specifically setting it on the client.
# File lib/mcollective/rpc/client.rb 392 def discovery_method=(method) 393 @default_discovery_method = false 394 @discovery_method = method 395 396 if @initial_options[:discovery_options] 397 @discovery_options = @initial_options[:discovery_options] 398 else 399 @discovery_options.clear 400 end 401 402 @client.options = options 403 404 reset 405 end
# File lib/mcollective/rpc/client.rb 407 def discovery_options=(options) 408 @discovery_options = [options].flatten 409 reset 410 end
# File lib/mcollective/rpc/client.rb 359 def discovery_timeout 360 return @discovery_timeout if @discovery_timeout 361 return @client.discoverer.ddl.meta[:timeout] 362 end
# File lib/mcollective/rpc/client.rb 364 def discovery_timeout=(timeout) 365 @discovery_timeout = Float(timeout) 366 367 # we calculate the overall timeout from the DDL of the agent and 368 # the supplied discovery timeout unless someone specifically 369 # specifies a timeout to the constructor 370 # 371 # But if we also then specifically set a discovery_timeout on the 372 # agent that has to override the supplied timeout so we then 373 # calculate a correct timeout based on DDL timeout and the 374 # supplied discovery timeout 375 @timeout = @ddl.meta[:timeout] + discovery_timeout 376 end
Sets the fact filter
# File lib/mcollective/rpc/client.rb 420 def fact_filter(fact, value=nil, operator="=") 421 return if fact.nil? 422 return if fact == false 423 424 if value.nil? 425 parsed = Util.parse_fact_string(fact) 426 @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false 427 else 428 parsed = Util.parse_fact_string("#{fact}#{operator}#{value}") 429 @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false 430 end 431 432 @filter["fact"].compact! 433 reset 434 end
for requests that do not care for results just return the request id and don't do any of the response processing.
We send the :process_results flag with to the nodes so they can make decisions based on that.
Should only be called via method_missing
# File lib/mcollective/rpc/client.rb 759 def fire_and_forget_request(action, args, filter=nil) 760 validate_request(action, args) 761 762 identity_filter_discovery_optimization 763 764 req = new_request(action.to_s, args) 765 766 filter = options[:filter] unless filter 767 768 message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options}) 769 message.reply_to = @reply_to if @reply_to 770 771 if @force_direct_request || @client.discoverer.force_direct_mode? 772 message.discovered_hosts = discover.clone 773 message.type = :direct_request 774 end 775 776 client.sendreq(message, nil) 777 end
Returns help for an agent if a DDL
was found
# File lib/mcollective/rpc/client.rb 136 def help(template) 137 @ddl.help(template) 138 end
Sets the identity filter
# File lib/mcollective/rpc/client.rb 444 def identity_filter(identity) 445 @filter["identity"] = @filter["identity"] | [identity] 446 @filter["identity"].compact! 447 reset 448 end
if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I's on the CLI
For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.
This is only needed for the 'mc' discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them
# File lib/mcollective/rpc/client.rb 790 def identity_filter_discovery_optimization 791 if options[:filter]["identity"].size > 0 && @discovery_method == "mc" 792 regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size 793 794 if regex_filters == 0 795 @discovered_agents = options[:filter]["identity"].clone 796 @force_direct_request = true if Config.instance.direct_addressing 797 end 798 end 799 end
Sets and sanity check the limit_method
variable used to determine how to limit targets if limit_targets
is set
# File lib/mcollective/rpc/client.rb 639 def limit_method=(method) 640 method = method.to_sym unless method.is_a?(Symbol) 641 642 raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method) 643 644 @limit_method = method 645 end
Sets and sanity checks the limit_targets
variable used to restrict how many nodes we'll target Limit targets can be reset by passing nil or false
# File lib/mcollective/rpc/client.rb 618 def limit_targets=(limit) 619 if !limit 620 @limit_targets = nil 621 return 622 end 623 624 if limit.is_a?(String) 625 raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/ 626 627 begin 628 @limit_targets = Integer(limit) 629 rescue 630 @limit_targets = limit 631 end 632 else 633 @limit_targets = Integer(limit) 634 end 635 end
# File lib/mcollective/rpc/client.rb 720 def load_aggregate_functions(action, ddl) 721 return nil unless ddl 722 return nil unless ddl.action_interface(action).keys.include?(:aggregate) 723 724 return Aggregate.new(ddl.action_interface(action)) 725 726 rescue => e 727 Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class]) 728 return nil 729 end
Magic handler to invoke remote methods
Once the stub is created using the constructor or the RPC#rpcclient
helper you can call remote actions easily:
ret = rpc.echo(:msg => "hello world")
This will call the 'echo' action of the 'rpctest' agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req
with additional error codes and error text:
{
:sender => "remote.box.com", :statuscode => 0, :statusmsg => "OK", :data => "hello world"
}
If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
Codes 2 to 5 maps directly to UnknownRPCAction
, MissingRPCData
, InvalidRPCData
and UnknownRPCError
see below for a description of those, in each case :statusmsg would be the reason for failure.
To get access to the full result of the MCollective::Client#req
calls you can pass in a block:
rpc.echo(:msg => "hello world") do |resp| pp resp end
In this case resp will the result from MCollective::Client#req
. Instead of returning simple text and codes as above you'll also need to handle the following exceptions:
UnknownRPCAction
- There is no matching action on the agent MissingRPCData
- You did not supply all the needed parameters for the action InvalidRPCData
- The data you did supply did not pass validation UnknownRPCError
- Some other error prevented the agent from running
During calls a progress indicator will be shown of how many results we've received against how many nodes were discovered, you can disable this by setting progress to false:
rpc.progress = false
This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It's a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.
You can invoke this using:
puts rpc.echo(:process_results => false)
This will output just the request id.
Batched processing is supported:
printrpc rpc.ping(:batch_size => 5)
This will do everything exactly as normal but communicate to only 5 agents at a time
# File lib/mcollective/rpc/client.rb 241 def method_missing(method_name, *args, &block) 242 # set args to an empty hash if nothings given 243 args = args[0] 244 args = {} if args.nil? 245 246 action = method_name.to_s 247 248 @stats.reset 249 250 validate_request(action, args) 251 252 # TODO(ploubser): The logic here seems poor. It implies that it is valid to 253 # pass arguments where batch_mode is set to false and batch_mode > 0. 254 # If this is the case we completely ignore the supplied value of batch_mode 255 # and do our own thing. 256 257 # if a global batch size is set just use that else set it 258 # in the case that it was passed as an argument 259 batch_mode = args.include?(:batch_size) || @batch_mode 260 batch_size = args.delete(:batch_size) || @batch_size 261 batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time 262 263 # if we were given a batch_size argument thats 0 and batch_mode was 264 # determined to be on via global options etc this will allow a batch_size 265 # of 0 to disable or batch_mode for this call only 266 batch_mode = determine_batch_mode(batch_size) 267 268 # Handle single target requests by doing discovery and picking 269 # a random node. Then do a custom request specifying a filter 270 # that will only match the one node. 271 if @limit_targets 272 target_nodes = pick_nodes_from_discovered(@limit_targets) 273 Log.debug("Picked #{target_nodes.join(',')} as limited target(s)") 274 275 custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block) 276 elsif batch_mode 277 call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block) 278 else 279 call_agent(action, args, options, :auto, &block) 280 end 281 end
Creates a suitable request hash for the SimpleRPC agent.
You'd use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn't care for responses.
In that case you can just do:
msg = your_rpc.new_request("some_action", :foo => :bar) filter = your_rpc.filter your_rpc.client.sendreq(msg, msg[:agent], filter)
This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not
Clearly the use of this technique should be limited and done only if your code requires such a thing
# File lib/mcollective/rpc/client.rb 159 def new_request(action, data) 160 callerid = PluginManager["security_plugin"].callerid 161 162 raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid) 163 164 {:agent => @agent, 165 :action => action, 166 :caller => callerid, 167 :data => data} 168 end
Provides a normal options hash like you would get from Optionparser
# File lib/mcollective/rpc/client.rb 590 def options 591 {:disctimeout => discovery_timeout, 592 :timeout => @timeout, 593 :verbose => @verbose, 594 :filter => @filter, 595 :collective => @collective, 596 :output_format => @output_format, 597 :ttl => @ttl, 598 :discovery_method => @discovery_method, 599 :discovery_options => @discovery_options, 600 :force_display_mode => @force_display_mode, 601 :config => @config, 602 :publish_timeout => @publish_timeout, 603 :threaded => @threaded} 604 end
Pick a number of nodes from the discovered nodes
The count should be a string that can be either just a number or a percentage like 10%
It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else
- :first would be a simple way to do a distance based selection - anything else will just pick one at random - if random chosen, and batch-seed set, then set srand for the generator, and reset afterwards
# File lib/mcollective/rpc/client.rb 679 def pick_nodes_from_discovered(count) 680 if count =~ /%$/ 681 pct = Integer((discover.size * (count.to_f / 100))) 682 pct == 0 ? count = 1 : count = pct 683 else 684 count = Integer(count) 685 end 686 687 return discover if discover.size <= count 688 689 result = [] 690 691 if @limit_method == :first 692 return discover[0, count] 693 else 694 # we delete from the discovered list because we want 695 # to be sure there is no chance that the same node will 696 # be randomly picked twice. So we have to clone the 697 # discovered list else this method will only ever work 698 # once per discovery cycle and not actually return the 699 # right nodes. 700 haystack = discover.clone 701 702 if @limit_seed 703 haystack.sort! 704 srand(@limit_seed) 705 end 706 707 count.times do 708 rnd = rand(haystack.size) 709 result << haystack.delete_at(rnd) 710 end 711 712 # Reset random number generator to fresh seed 713 # As our seed from options is most likely short 714 srand if @limit_seed 715 end 716 717 [result].flatten 718 end
process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data
# File lib/mcollective/rpc/client.rb 1017 def process_results_with_block(action, resp, block, aggregate) 1018 @stats.node_responded(resp[:senderid]) 1019 1020 result = rpc_result_from_reply(@agent, action, resp) 1021 aggregate = aggregate_reply(result, aggregate) if aggregate 1022 1023 @stats.ok if result[:statuscode] == 0 1024 @stats.fail if result[:statuscode] != 0 1025 @stats.time_block_execution :start 1026 1027 case block.arity 1028 when 1 1029 block.call(resp) 1030 when 2 1031 block.call(resp, result) 1032 end 1033 1034 @stats.time_block_execution :end 1035 1036 return aggregate 1037 end
Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller
# File lib/mcollective/rpc/client.rb 997 def process_results_without_block(resp, action, aggregate) 998 @stats.node_responded(resp[:senderid]) 999 1000 result = rpc_result_from_reply(@agent, action, resp) 1001 aggregate = aggregate_reply(result, aggregate) if aggregate 1002 1003 if result[:statuscode] == 0 || result[:statuscode] == 1 1004 @stats.ok if result[:statuscode] == 0 1005 @stats.fail if result[:statuscode] == 1 1006 else 1007 @stats.fail 1008 end 1009 1010 [result, aggregate] 1011 end
Resets various internal parts of the class, most importantly it clears out the cached discovery
# File lib/mcollective/rpc/client.rb 458 def reset 459 @discovered_agents = nil 460 end
Reet the filter to an empty one
# File lib/mcollective/rpc/client.rb 463 def reset_filter 464 @filter = Util.empty_filter 465 agent_filter @agent 466 end
# File lib/mcollective/rpc/client.rb 741 def rpc_result_from_reply(agent, action, reply) 742 senderid = reply.include?("senderid") ? reply["senderid"] : reply[:senderid] 743 body = reply.include?("body") ? reply["body"] : reply[:body] 744 s_code = body.include?("statuscode") ? body["statuscode"] : body[:statuscode] 745 s_msg = body.include?("statusmsg") ? body["statusmsg"] : body[:statusmsg] 746 data = body.include?("data") ? body["data"] : body[:data] 747 748 Result.new(agent, action, {:sender => senderid, :statuscode => s_code, :statusmsg => s_msg, :data => data}) 749 end
For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL
for arguments that were not supplied in the request
We then pass the modified arguments to the DDL
for validation
# File lib/mcollective/rpc/client.rb 175 def validate_request(action, args) 176 raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl 177 178 @ddl.set_default_input_arguments(action, args) 179 @ddl.validate_rpc_request(action, args) 180 end