class Fluent::Plugin::FileServiceDiscovery
Constants
- DEFAULT_FILE_TYPE
- DEFAULT_SD_FILE_PATH
- DEFAUT_WEIGHT
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 38 def initialize super @file_type = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 44 def configure(conf) super unless File.exist?(@path) raise Fluent::ConfigError, "sd_file: path=#{@path} not found" end @file_type = File.basename(@path).split('.', 2).last.to_sym unless %i[yaml yml json].include?(@file_type) @file_type = DEFAULT_FILE_TYPE end @services = fetch_server_info end
start(queue)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 59 def start(queue) watcher = StatWatcher.new(@path, @log) do |_prev, _cur| refresh_file(queue) end event_loop_attach(watcher) super() end
Private Instance Methods
fetch_server_info()
click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 115 def fetch_server_info config_data = begin File.open(@path, "r:#{@conf_encoding}:utf-8", &:read) rescue => e raise Fluent::ConfigError, "sd_file: path=#{@path} couldn't open #{e}" end parser.call(config_data).map do |s| Service.new( :file, s.fetch('host'), s.fetch('port'), s['name'], s.fetch('weight', DEFAUT_WEIGHT), s['standby'], s['username'], s['password'], s['shared_key'], ) end rescue KeyError => e raise Fluent::ConfigError, "#{e}. Service must have `host` and `port`" end
parser()
click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 70 def parser @parser ||= case @file_type when :yaml, :yml require 'yaml' -> (v) { YAML.safe_load(v).map } when :json require 'json' -> (v) { JSON.parse(v) } end end
refresh_file(queue)
click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 82 def refresh_file(queue) s = begin fetch_server_info rescue => e @log.error("sd_file: #{e}") return end if s.nil? # if any error occurs, skip this turn return end diff = [] join = s - @services # Need service_in first to guarantee that server exist at least one all time. join.each do |j| diff << ServiceDiscovery.service_in_msg(j) end drain = @services - s drain.each do |d| diff << ServiceDiscovery.service_out_msg(d) end @services = s diff.each do |a| queue.push(a) end end