class Fluent::EngineClass

Constants

MAINLOOP_SLEEP_INTERVAL

Attributes

root_agent[R]
supervisor_mode[R]
system_config[R]

Public Class Methods

new() click to toggle source
# File lib/fluent/engine.rb, line 33
def initialize
  @root_agent = nil
  @engine_stopped = false
  @_worker_id = nil

  @log_event_verbose = false
  @suppress_config_dump = false
  @without_source = false

  @fluent_log_event_router = nil
  @system_config = SystemConfig.new

  @supervisor_mode = false
end

Public Instance Methods

add_plugin_dir(dir) click to toggle source
# File lib/fluent/engine.rb, line 118
def add_plugin_dir(dir)
  $log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir')
  Plugin.add_plugin_dir(dir)
end
configure(conf) click to toggle source
# File lib/fluent/engine.rb, line 104
def configure(conf)
  @root_agent.configure(conf)

  @fluent_log_event_router = FluentLogEventRouter.build(@root_agent)

  if @fluent_log_event_router.emittable?
    $log.enable_event(true)
  end

  unless @suppress_config_dump
    $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
  end
end
emit(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 123
def emit(tag, time, record)
  raise "BUG: use router.emit instead of Engine.emit"
end
emit_array(tag, array) click to toggle source
# File lib/fluent/engine.rb, line 127
def emit_array(tag, array)
  raise "BUG: use router.emit_array instead of Engine.emit_array"
end
emit_stream(tag, es) click to toggle source
# File lib/fluent/engine.rb, line 131
def emit_stream(tag, es)
  raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end
flush!() click to toggle source
# File lib/fluent/engine.rb, line 135
def flush!
  @root_agent.flush!
end
init(system_config, supervisor_mode: false) click to toggle source
# File lib/fluent/engine.rb, line 52
def init(system_config, supervisor_mode: false)
  @system_config = system_config
  @supervisor_mode = supervisor_mode

  @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
  @without_source = system_config.without_source unless system_config.without_source.nil?

  @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?

  @root_agent = RootAgent.new(log: log, system_config: @system_config)

  self
end
log() click to toggle source
# File lib/fluent/engine.rb, line 66
def log
  $log
end
now() click to toggle source
# File lib/fluent/engine.rb, line 139
def now
  # TODO thread update
  Fluent::EventTime.now
end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false) click to toggle source
# File lib/fluent/engine.rb, line 70
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
  if fname =~ /\.rb$/
    require 'fluent/config/dsl'
    Config::DSL::Parser.parse(io, File.join(basepath, fname))
  else
    Config.parse(io, fname, basepath, v1_config)
  end
end
push_log_event(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 209
def push_log_event(tag, time, record)
  @fluent_log_event_router.emit_event([tag, time, record])
end
reload_config(conf, supervisor: false) click to toggle source

@param conf [Fluent::Config] @param supervisor [Bool] @reutrn nil

# File lib/fluent/engine.rb, line 167
def reload_config(conf, supervisor: false)
  # configure first to reduce down time while restarting
  new_agent = RootAgent.new(log: log, system_config: @system_config)
  ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

  ret.all_plugins.each do |plugin|
    if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
      raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
    end
  end

  # Assign @root_agent to new root_agent
  # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
  old_agent, @root_agent = @root_agent, new_agent
  begin
    @root_agent.configure(conf)
  rescue
    @root_agent = old_agent
    raise
  end

  unless @suppress_config_dump
    $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
  end

  # supervisor doesn't handle actual data. so the following code is unnecessary.
  if supervisor
    old_agent.shutdown      # to close thread created in #configure
    return
  end

  stop_phase(old_agent)

  $log.info 'restart fluentd worker', worker: worker_id
  start_phase(new_agent)
end
run() click to toggle source
# File lib/fluent/engine.rb, line 144
def run
  begin
    $log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
    start

    @fluent_log_event_router.start

    $log.info "fluentd worker is now running", worker: worker_id
    sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
    $log.info "fluentd worker is now stopping", worker: worker_id

  rescue Exception => e
    $log.error "unexpected error", error: e
    $log.error_backtrace
    raise
  end

  stop_phase(@root_agent)
end
run_configure(conf, dry_run: false) click to toggle source
# File lib/fluent/engine.rb, line 79
def run_configure(conf, dry_run: false)
  configure(conf)
  conf.check_not_fetched do |key, e|
    parent_name, plugin_name = e.unused_in
    message = if parent_name && plugin_name
                "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
              elsif parent_name
                "section <#{e.name}> is not used in <#{parent_name}>"
              elsif e.name != 'system' && !(@without_source && e.name == 'source')
                "parameter '#{key}' in #{e.to_s.strip} is not used."
              else
                nil
              end
    next if message.nil?

    if dry_run && @supervisor_mode
      $log.warn :supervisor, message
    elsif e.for_every_workers?
      $log.warn :worker0, message
    elsif e.for_this_worker?
      $log.warn message
    end
  end
end
stop() click to toggle source
# File lib/fluent/engine.rb, line 204
def stop
  @engine_stopped = true
  nil
end
worker_id() click to toggle source
# File lib/fluent/engine.rb, line 213
def worker_id
  if @supervisor_mode
    return -1
  end

  return @_worker_id if @_worker_id
  # if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
  # so it's (almost) a single worker, worker_id=0
  @_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
  @_worker_id
end

Private Instance Methods

start() click to toggle source
# File lib/fluent/engine.rb, line 247
def start
  @root_agent.start
end
start_phase(root_agent) click to toggle source
# File lib/fluent/engine.rb, line 238
def start_phase(root_agent)
  @fluent_log_event_router = FluentLogEventRouter.build(root_agent)
  if @fluent_log_event_router.emittable?
    $log.enable_event(true)
  end

  @root_agent.start
end
stop_phase(root_agent) click to toggle source
# File lib/fluent/engine.rb, line 227
def stop_phase(root_agent)
  unless @log_event_verbose
    $log.enable_event(false)
    @fluent_log_event_router.graceful_stop
  end
  $log.info 'shutting down fluentd worker', worker: worker_id
  root_agent.shutdown

  @fluent_log_event_router.stop
end