class Fluent::Plugin::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length is 3

LISTEN_PORT

Attributes

nodes[R]
read_interval[R]
recover_sample_size[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Output::new
# File lib/fluent/plugin/out_forward.rb, line 160
def initialize
  super

  @nodes = [] #=> [Node]
  @loop = nil
  @thread = nil

  @usock = nil
  @keep_alive_watcher_interval = 5 # TODO
  @suspend_flush = false
  @healthy_nodes_count_metrics = nil
  @registered_nodes_count_metrics = nil
end

Public Instance Methods

after_shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Output#after_shutdown
# File lib/fluent/plugin/out_forward.rb, line 346
def after_shutdown
  last_ack if @require_ack_response
  super
end
before_shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Output#before_shutdown
# File lib/fluent/plugin/out_forward.rb, line 341
def before_shutdown
  super
  @suspend_flush = true
end
close() click to toggle source
Calls superclass method Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_forward.rb, line 324
def close
  if @usock
    # close socket and ignore errors: this socket will not be used anyway.
    @usock.close rescue nil
  end

  super
end
configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_forward.rb, line 174
def configure(conf)
  compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')

  super

  unless @chunk_key_tag
    raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
  end

  @read_interval = @read_interval_msec / 1000.0
  @recover_sample_size = @recover_wait / @heartbeat_interval

  if @heartbeat_type == :tcp
    log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
    @heartbeat_type = :transport
  end

  if @dns_round_robin && @heartbeat_type == :udp
    raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
  end

  if @transport == :tls
    # socket helper adds CA cert or signed certificate to same cert store internally so unify it in this place.
    if @tls_cert_path && !@tls_cert_path.empty?
      @tls_ca_cert_path = @tls_cert_path
    end
    if @tls_ca_cert_path && !@tls_ca_cert_path.empty?
      @tls_ca_cert_path.each do |path|
        raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
        raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
      end
    end

    if @tls_insecure_mode
      log.warn "TLS transport is configured in insecure way"
      @tls_verify_hostname = false
      @tls_allow_self_signed_cert = true
    end

    if Fluent.windows?
      if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name
        raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted"
      end
    else
      raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name
      raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint
    end
  end

  @ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
  socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
  @connection_manager = ConnectionManager.new(
    log: @log,
    secure: !!@security,
    connection_factory: method(:create_transfer_socket),
    socket_cache: socket_cache,
  )

  service_discovery_configure(
    :out_forward_service_discovery_watcher,
    static_default_service_directive: 'server',
    load_balancer: LoadBalancer.new(log),
    custom_build_method: method(:build_node),
  )

  service_discovery_services.each do |server|
    # it's only for test
    @nodes << server
    unless @heartbeat_type == :none
      begin
        server.validate_host_resolution!
      rescue => e
        raise unless @ignore_network_errors_at_startup
        log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
        server.disable!
      end
    end
  end

  unless @as_secondary
    if @compress == :gzip && @buffer.compress == :text
      @buffer.compress = :gzip
    elsif @compress == :text && @buffer.compress == :gzip
      log.info "buffer is compressed.  If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
    end
  end

  if service_discovery_services.empty?
    raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
  end

  if !@keepalive && @keepalive_timeout
    log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
  end

  raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
  @healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true)
  @registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true)

end
create_transfer_socket(host, port, hostname, &block) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 379
def create_transfer_socket(host, port, hostname, &block)
  case @transport
  when :tls
    socket_create_tls(
      host, port,
      version: @tls_version,
      ciphers: @tls_ciphers,
      insecure: @tls_insecure_mode,
      verify_fqdn: @tls_verify_hostname,
      fqdn: hostname,
      allow_self_signed_cert: @tls_allow_self_signed_cert,
      cert_paths: @tls_ca_cert_path,
      cert_path: @tls_client_cert_path,
      private_key_path: @tls_client_private_key_path,
      private_key_passphrase: @tls_client_private_key_passphrase,
      cert_thumbprint: @tls_cert_thumbprint,
      cert_logical_store_name: @tls_cert_logical_store_name,
      cert_use_enterprise_store: @tls_cert_use_enterprise_store,

      # Enabling SO_LINGER causes tcp port exhaustion on Windows.
      # This is because dynamic ports are only 16384 (from 49152 to 65535) and
      # expiring SO_LINGER enabled ports should wait 4 minutes
      # where set by TcpTimeDelay. Its default value is 4 minutes.
      # So, we should disable SO_LINGER on Windows to prevent flood of waiting ports.
      linger_timeout: Fluent.windows? ? nil : @send_timeout,
      send_timeout: @send_timeout,
      recv_timeout: @ack_response_timeout,
      connect_timeout: @connect_timeout,
      &block
    )
  when :tcp
    socket_create_tcp(
      host, port,
      linger_timeout: @send_timeout,
      send_timeout: @send_timeout,
      recv_timeout: @ack_response_timeout,
      connect_timeout: @connect_timeout,
      &block
    )
  else
    raise "BUG: unknown transport protocol #{@transport}"
  end
end
forward_header() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 445
def forward_header
  FORWARD_HEADER
end
last_ack() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 356
def last_ack
  overwrite_delayed_commit_timeout
  ack_check(ack_select_interval)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 275
def multi_workers_ready?
  true
end
overwrite_delayed_commit_timeout() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 283
def overwrite_delayed_commit_timeout
  # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
  # But it should be overwritten by ack_response_timeout to rollback chunks after timeout
  if @delayed_commit_timeout != @ack_response_timeout
    log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
    @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
  end
end
prefer_delayed_commit() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 279
def prefer_delayed_commit
  @require_ack_response
end
start() click to toggle source
Calls superclass method Fluent::Compat::Output#start
# File lib/fluent/plugin/out_forward.rb, line 292
def start
  super

  unless @heartbeat_type == :none
    if @heartbeat_type == :udp
      @usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true)
      server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
    end
    timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
  end

  if @require_ack_response
    overwrite_delayed_commit_timeout
    thread_create(:out_forward_receiving_ack, &method(:ack_reader))
  end

  if @verify_connection_at_startup
    service_discovery_services.each do |node|
      begin
        node.verify_connection
      rescue StandardError => e
        log.fatal "forward's connection setting error: #{e.message}"
        raise Fluent::UnrecoverableError, e.message
      end
    end
  end

  if @keepalive
    timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
  end
end
statistics() click to toggle source
Calls superclass method Fluent::Plugin::Output#statistics
# File lib/fluent/plugin/out_forward.rb, line 423
def statistics
  stats = super
  services = service_discovery_services
  @healthy_nodes_count_metrics.set(0)
  @registered_nodes_count_metrics.set(services.size)
  services.each do |s|
    if s.available?
      @healthy_nodes_count_metrics.inc
    end
  end

  stats = {
    'output' => stats["output"].merge({
      'healthy_nodes_count' => @healthy_nodes_count_metrics.get,
      'registered_nodes_count' => @registered_nodes_count_metrics.get,
    })
  }
  stats
end
stop() click to toggle source
Calls superclass method Fluent::Plugin::Output#stop
# File lib/fluent/plugin/out_forward.rb, line 333
def stop
  super

  if @keepalive
    @connection_manager.stop
  end
end
try_flush() click to toggle source
Calls superclass method Fluent::Plugin::Output#try_flush
# File lib/fluent/plugin/out_forward.rb, line 351
def try_flush
  return if @require_ack_response && @suspend_flush
  super
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 368
def try_write(chunk)
  log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
  if chunk.empty?
    commit_write(chunk.unique_id)
    return
  end
  tag = chunk.metadata.tag
  service_discovery_select_service { |node| node.send_data(tag, chunk) }
  last_ack if @require_ack_response && @suspend_flush
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 361
def write(chunk)
  return if chunk.empty?
  tag = chunk.metadata.tag

  service_discovery_select_service { |node| node.send_data(tag, chunk) }
end

Private Instance Methods

ack_check(select_interval) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 516
def ack_check(select_interval)
  @ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
    @connection_manager.close(sock)

    case result
    when AckHandler::Result::SUCCESS
      commit_write(chunk_id)
    when AckHandler::Result::FAILED
      node.disable!
      rollback_write(chunk_id, update_retry: false)
    when AckHandler::Result::CHUNKID_UNMATCHED
      rollback_write(chunk_id, update_retry: false)
    else
      log.warn("BUG: invalid status #{result} #{chunk_id}")

      if chunk_id
        rollback_write(chunk_id, update_retry: false)
      end
    end
  end
end
ack_reader() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 508
def ack_reader
  select_interval = ack_select_interval

  while thread_current_running?
    ack_check(select_interval)
  end
end
ack_select_interval() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 500
def ack_select_interval
  if @delayed_commit_timeout > 3
    1
  else
    @delayed_commit_timeout / 3.0
  end
end
build_node(server) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 451
def build_node(server)
  name = server.name || "#{server.host}:#{server.port}"
  log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id

  failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
  if @heartbeat_type == :none
    NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
  else
    Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
  end
end
on_heartbeat_timer() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 463
def on_heartbeat_timer
  need_rebuild = false
  service_discovery_services.each do |n|
    begin
      log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
      n.usock = @usock if @usock
      need_rebuild = n.send_heartbeat || need_rebuild
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED, Errno::ETIMEDOUT => e
      log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: e
    rescue => e
      log.debug "unexpected error happen during heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: e
    end

    need_rebuild = n.tick || need_rebuild
  end

  if need_rebuild
    service_discovery_rebalance
  end
end
on_purge_obsolete_socks() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 496
def on_purge_obsolete_socks
  @connection_manager.purge_obsolete_socks
end
on_udp_heatbeat_response_recv(data, sock) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 484
def on_udp_heatbeat_response_recv(data, sock)
  sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
  if node = service_discovery_services.find { |n| n.sockaddr == sockaddr }
    # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
    if node.heartbeat
      service_discovery_rebalance
    end
  else
    log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
  end
end