class Fluent::PluginHelper::ServiceDiscovery::Manager
Public Class Methods
new(log:, load_balancer: nil, custom_build_method: nil)
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 24 def initialize(log:, load_balancer: nil, custom_build_method: nil) @log = log @load_balancer = load_balancer || RoundRobinBalancer.new @custom_build_method = custom_build_method @discoveries = [] @services = {} @queue = Queue.new @static_config = true end
Public Instance Methods
configure(configs, parent: nil)
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 35 def configure(configs, parent: nil) configs.each do |config| type, conf = if config.has_key?(:conf) # for compatibility with initial API [config[:type], config[:conf]] else [config['@type'], config] end sd = Fluent::Plugin.new_sd(type, parent: parent) sd.configure(conf) sd.services.each do |s| @services[s.discovery_id] = build_service(s) end @discoveries << sd if @static_config && type.to_sym != :static @static_config = false end end rebalance end
rebalance()
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 103 def rebalance @load_balancer.rebalance(services) end
run_once()
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 77 def run_once # Don't care race in this loop intentionally s = @queue.size if s == 0 return end s.times do msg = @queue.pop unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage) @log.warn("BUG: #{msg}") next end begin handle_message(msg) rescue => e @log.error(e) end end rebalance end
select_service(&block)
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 107 def select_service(&block) @load_balancer.select_service(&block) end
services()
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 111 def services @services.values end
start()
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 63 def start @discoveries.each do |d| d.start(@queue) end end
static_config?()
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 59 def static_config? @static_config end
Private Instance Methods
build_service(n)
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 140 def build_service(n) @custom_build_method ? @custom_build_method.call(n) : n end
handle_message(msg)
click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 117 def handle_message(msg) service = msg.service case msg.type when Fluent::Plugin::ServiceDiscovery::SERVICE_IN if (n = build_service(service)) @log.info("Service in: name=#{service.name} #{service.host}:#{service.port}") @services[service.discovery_id] = n else raise "failed to build service in name=#{service.name} #{service.host}:#{service.port}" end when Fluent::Plugin::ServiceDiscovery::SERVICE_OUT s = @services.delete(service.discovery_id) if s @log.info("Service out: name=#{service.name} #{service.host}:#{service.port}") else @log.warn("Not found service: name=#{service.name} #{service.host}:#{service.port}") end else @log.error("BUG: unknow message type: #{msg.type}") end end