class Fluent::Plugin::ForwardOutput::LoadBalancer

Public Class Methods

new(log) click to toggle source
# File lib/fluent/plugin/out_forward/load_balancer.rb, line 23
def initialize(log)
  @log = log
  @weight_array = []
  @rand_seed = Random.new.seed
  @rr = 0
  @mutex = Mutex.new
end

Public Instance Methods

rebalance(nodes)
rebuild_weight_array(nodes) click to toggle source
# File lib/fluent/plugin/out_forward/load_balancer.rb, line 58
def rebuild_weight_array(nodes)
  standby_nodes, regular_nodes = nodes.select { |e| e.weight > 0 }.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  @log.debug("rebuilding weight array", lost_weight: lost_weight)

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        @log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  if regular_nodes.empty?
    @log.warn('No nodes are available')
    @mutex.synchronize do
      @weight_array = weight_array
    end
    return @weight_array
  end

  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @mutex.synchronize do
    @weight_array = weight_array
  end
end
Also aliased as: rebalance
select_healthy_node() { |node| ... } click to toggle source
# File lib/fluent/plugin/out_forward/load_balancer.rb, line 31
def select_healthy_node
  error = nil

  # Don't care about the change of @weight_array's size while looping since
  # it's only used for determining the number of loops and it is not so important.
  wlen = @weight_array.size
  wlen.times do
    node = @mutex.synchronize do
      r = @rr % @weight_array.size
      @rr = (r + 1) % @weight_array.size
      @weight_array[r]
    end
    next unless node.available?

    begin
      ret = yield node
      return ret, node
    rescue
      # for load balancing during detecting crashed servers
      error = $!  # use the latest error
    end
  end

  raise error if error
  raise NoNodesAvailable, "no nodes are available"
end
Also aliased as: select_service
select_service()
Alias for: select_healthy_node