class Fluent::Plugin::SrvServiceDiscovery

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_srv.rb, line 50
def initialize
  super
  @target = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_srv.rb, line 55
def configure(conf)
  super

  @target = "_#{@service}._#{@proto}.#{@hostname}"
  @dns_resolve =
    if @dns_server_host.nil?
      Resolv::DNS.new
    elsif @dns_server_host.include?(':') # e.g. 127.0.0.1:8600
      host, port = @dns_server_host.split(':', 2)
      Resolv::DNS.new(nameserver_port: [[host, port.to_i]])
    else
      Resolv::DNS.new(nameserver: @dns_server_host)
    end

  @services = fetch_srv_record
end
start(queue) click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_srv.rb, line 72
def start(queue)
  timer_execute(:"sd_srv_record_#{@target}", @interval) do
    refresh_srv_records(queue)
  end

  super()
end

Private Instance Methods

dns_lookup!(host) click to toggle source
# File lib/fluent/plugin/sd_srv.rb, line 129
def dns_lookup!(host)
  # may need to cache the result
  @dns_resolve.getaddress(host) # get first result for now
end
fetch_srv_record() click to toggle source
# File lib/fluent/plugin/sd_srv.rb, line 113
def fetch_srv_record
  adders = @dns_resolve.getresources(@target, Resolv::DNS::Resource::IN::SRV)

  services = []

  adders.each do |addr|
    host = @dns_lookup ? dns_lookup!(addr.target) : addr.target
    services << [
      addr.priority,
      Service.new(:srv, host.to_s, addr.port.to_i, addr.target.to_s, addr.weight, false, @username, @password, @shared_key)
    ]
  end

  services.sort_by(&:first).flat_map { |s| s[1] }
end
refresh_srv_records(queue) click to toggle source
# File lib/fluent/plugin/sd_srv.rb, line 82
def refresh_srv_records(queue)
  s = begin
        fetch_srv_record
      rescue => e
        @log.error("sd_srv: #{e}")
        return
      end

  if s.nil? || s.empty?
    return
  end

  diff = []
  join = s - @services
  # Need service_in first to guarantee that server exist at least one all time.
  join.each do |j|
    diff << ServiceDiscovery.service_in_msg(j)
  end

  drain = @services - s
  drain.each do |d|
    diff << ServiceDiscovery.service_out_msg(d)
  end

  @services = s

  diff.each do |a|
    queue.push(a)
  end
end