module Fluent::PluginHelper::Server

Constants

CONNECTION_PROTOCOLS
PEERADDR_FAILED

Use string “?” for port, not integer or nil. “?” is clear than -1 or nil in the log.

PROTOCOLS
SERVER_TRANSPORT_PARAMS
ServerInfo

Attributes

_servers[R]

stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 297
def self.included(mod)
  mod.include ServerTransportParams
end
new() click to toggle source
Calls superclass method Fluent::PluginHelper::EventLoop::new
# File lib/fluent/plugin_helper/server.rb, line 301
def initialize
  super
  @_servers = []
  @_server_connections = []
  @_server_mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 308
def configure(conf)
  super

  if @transport_config
    if @transport_config.protocol == :tls
      cert_option_server_validate!(@transport_config)
    end
  end
end
server_attach(title, proto, port, bind, shared, server) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 206
def server_attach(title, proto, port, bind, shared, server)
  @_servers << ServerInfo.new(title, proto, port, bind, shared, server)
  event_loop_attach(server)
end
server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) click to toggle source

server_create(:title, @port) do |data|

# ...

end server_create(:title, @port) do |data, conn|

# ...

end server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|

sock.remote_host
sock.remote_port
# ...

end

# File lib/fluent/plugin_helper/server.rb, line 122
def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
  raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  unless socket
    socket_option_validate!(proto, **socket_options)
    socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
  end

  if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
    raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
  end
  if proto != :udp # UDP options
    raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
    raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
  end

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :udp
    raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
    if socket
      sock = socket
      close_socket = false
    else
      sock = server_create_udp_socket(shared, bind, port)
      socket_option_setter.call(sock)
      close_socket = true
    end
    server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
  when :unix
    raise "not implemented yet"
  else
    raise "BUG: unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end
server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) click to toggle source

server_create_connection(:title, @port) do |conn|

# on connection
source_addr = conn.remote_host
source_port = conn.remote_port
conn.data do |data|
  # on data
  conn.write resp # ...
  conn.close
end

end

# File lib/fluent/plugin_helper/server.rb, line 70
def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
  raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
  raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  socket_option_validate!(proto, **socket_options)
  socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
  when :unix
    raise "not implemented yet"
  else
    raise "unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end
server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 211
def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end
server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 226
def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
  context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end
server_create_tcp(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 188
def server_create_tcp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tcp, **kwargs, &callback)
end
server_create_tcp_socket(shared, bind, port) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 351
def server_create_tcp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_tcp(bind, port)
         else
           # TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           # backlog will be set by the caller, we don't need to set backlog here
           tsock = Addrinfo.tcp(bind, port).listen
           tsock.autoclose = false
           TCPServer.for_fd(tsock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end
server_create_tls(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 196
def server_create_tls(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tls, **kwargs, &callback)
end
server_create_transport_section_object(opts) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 252
def server_create_transport_section_object(opts)
  transport_section = configured_section_create(:transport)
  SERVER_TRANSPORT_PARAMS.each do |param|
    if opts.has_key?(param)
      transport_section[param] = opts[param]
    end
  end
  transport_section
end
server_create_udp(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 192
def server_create_udp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :udp, **kwargs, &callback)
end
server_create_udp_socket(shared, bind, port) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 366
def server_create_udp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_udp(bind, port)
         else
           # UDPSocket.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           usock = Addrinfo.udp(bind, port).bind
           usock.autoclose = false
           UDPSocket.for_fd(usock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end
server_create_unix(title, port, **kwargs, &callback) click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 200
def server_create_unix(title, port, **kwargs, &callback)
  server_create(title, port, proto: :unix, **kwargs, &callback)
end
server_socket_manager_client() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 343
def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end
server_wait_until_start() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 48
def server_wait_until_start
  # event_loop_wait_until_start works well for this
end
server_wait_until_stop() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 52
def server_wait_until_stop
  sleep 0.1 while @_servers.any?{|si| si.server.attached? }
  @_servers.each{|si| si.server.close rescue nil }
end
shutdown() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 330
def shutdown
  @_server_connections.each do |conn|
    conn.close rescue nil
  end

  super
end
stop() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 318
def stop
  @_server_mutex.synchronize do
    @_servers.each do |si|
      si.server.detach if si.server.attached?
      # to refuse more connections: (connected sockets are still alive here)
      si.server.close rescue nil
    end
  end

  super
end
terminate() click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 338
def terminate
  @_servers = []
  super
end