class Fluent::RootAgent
Fluentd forms a tree structure to manage plugins:
RootAgent | +------------+-------------+-------------+ | | | | <label> <source> <filter> <match> | +----+----+ | | <filter> <match>
Relation:
-
RootAgent
has many <label>, <source>, <filter> and <match> -
<label> has many <match> and <filter>
Next step: `fluentd/agent.rb` Next step: 'fluentd/label.rb'
Constants
- ERROR_LABEL
Attributes
inputs[R]
labels[R]
Public Class Methods
new(log:, system_config: SystemConfig.new)
click to toggle source
Calls superclass method
Fluent::Agent::new
# File lib/fluent/root_agent.rb, line 50 def initialize(log:, system_config: SystemConfig.new) super(log: log) @labels = {} @inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = false @enable_input_metrics = false suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @enable_input_metrics = !!system_config.enable_input_metrics end
Public Instance Methods
add_label(name)
click to toggle source
# File lib/fluent/root_agent.rb, line 329 def add_label(name) label = Label.new(name, log: log) raise ConfigError, "Section <label #{name}> appears twice" if @labels[name] label.root_agent = self @labels[name] = label end
add_source(type, conf)
click to toggle source
# File lib/fluent/root_agent.rb, line 311 def add_source(type, conf) log_type = conf.for_this_worker? ? :default : :worker0 log.info log_type, "adding source", type: type input = Plugin.new_input(type) # <source> emits events to the top-level event router (RootAgent#event_router). # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter. # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end @inputs << input input end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Agent#configure
# File lib/fluent/root_agent.rb, line 68 def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a # initialize <worker> elements conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg if target_worker_id_str.empty? raise Fluent::ConfigError, "Missing worker id on <worker> directive" end target_worker_ids = target_worker_id_str.split("-") if target_worker_ids.size == 2 first_worker_id = target_worker_ids.first.to_i last_worker_id = target_worker_ids.last.to_i if first_worker_id > last_worker_id raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>" end target_worker_ids = [] first_worker_id.step(last_worker_id, 1) do |worker_id| target_worker_id = worker_id.to_i target_worker_ids << target_worker_id if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id) if used_worker_ids.include?(target_worker_id) raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}" end used_worker_ids << target_worker_id e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive" end end unless target_worker_ids.empty? e.set_target_worker_ids(target_worker_ids.uniq) end end else target_worker_id = target_worker_id_str.to_i if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive" end elem.set_target_worker_id(target_worker_id) end end conf += e end conf.elements.delete_if{|e| e.name == 'worker'} error_label_config = nil # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output. label_configs = {} conf.elements(name: 'label').each { |e| if !Fluent::Engine.supervisor_mode && e.for_another_worker? next end name = e.arg raise ConfigError, "Missing symbol argument on <label> directive" if name.empty? raise ConfigError, "@ROOT for <label> is not permitted, reserved for getting root router" if name == '@ROOT' if name == ERROR_LABEL error_label_config = e else add_label(name) label_configs[name] = e end } # Call 'configure' here to avoid 'label not found' label_configs.each { |name, e| @labels[name].configure(e) } setup_error_label(error_label_config) if error_label_config super # initialize <source> elements if @without_source log.info :worker0, "'--without-source' is applied. Ignore <source> sections" else conf.elements(name: 'source').each { |e| if !Fluent::Engine.supervisor_mode && e.for_another_worker? next end type = e['@type'] raise ConfigError, "Missing '@type' parameter on <source> directive" unless type add_source(type, e) } end end
emit_error_event(tag, time, record, error)
click to toggle source
# File lib/fluent/root_agent.rb, line 344 def emit_error_event(tag, time, record, error) error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag, time: time} if @error_collector # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification log.warn "send an error event to @ERROR:", error_info @error_collector.emit(tag, time, record) else error_info[:record] = record log.warn "dump an error event:", error_info end end
find_label(label_name)
click to toggle source
# File lib/fluent/root_agent.rb, line 336 def find_label(label_name) if label = @labels[label_name] label else raise ArgumentError, "#{label_name} label not found" end end
flush!()
click to toggle source
# File lib/fluent/root_agent.rb, line 214 def flush! log.info "flushing all buffer forcedly" flushing_threads = [] lifecycle(desc: true) do |instance| if instance.respond_to?(:force_flush) t = Thread.new do Thread.current.abort_on_exception = true begin instance.force_flush rescue => e log.warn "unexpected error while flushing buffer", plugin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end end flushing_threads << t end end flushing_threads.each{|t| t.join } end
handle_emits_error(tag, es, error)
click to toggle source
# File lib/fluent/root_agent.rb, line 356 def handle_emits_error(tag, es, error) error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag} if @error_collector log.warn "send an error event stream to @ERROR:", error_info @error_collector.emit_stream(tag, es) else now = Time.now.to_i if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time log.warn "emit transaction failed:", error_info log.warn_backtrace @next_emit_error_log_time = now + @suppress_emit_error_log_interval end raise error end end
lifecycle(desc: false, kind_callback: nil) { |plugin, display_kind| ... }
click to toggle source
# File lib/fluent/root_agent.rb, line 172 def lifecycle(desc: false, kind_callback: nil) kind_or_label_list = if desc [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten else [:input, :output_with_router, @labels.values, :filter, :output].flatten end kind_or_label_list.each do |kind| if kind.respond_to?(:lifecycle) label = kind label.lifecycle(desc: desc) do |plugin, display_kind| yield plugin, display_kind end else list = if desc lifecycle_control_list[kind].reverse else lifecycle_control_list[kind] end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| yield instance, display_kind end end if kind_callback kind_callback.call end end end
setup_error_label(e)
click to toggle source
# File lib/fluent/root_agent.rb, line 166 def setup_error_label(e) error_label = add_label(ERROR_LABEL) error_label.configure(e) @error_collector = error_label.event_router end
shutdown()
click to toggle source
# File lib/fluent/root_agent.rb, line 234 def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. lifecycle_safe_sequence = ->(method, checker) { lifecycle do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) rescue Exception => e log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end end } lifecycle_unsafe_sequence = ->(method, checker) { operation = case method when :shutdown then "shutting down" when :close then "closing" else raise "BUG: unknown method name '#{method}'" end operation_threads = [] callback = ->(){ operation_threads.each{|t| t.join } operation_threads.clear } lifecycle(kind_callback: callback) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin if method == :shutdown # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence. # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown operation = "preparing shutdown" # for logging log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id begin instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?) rescue Exception => e log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end operation = "shutting down" log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(:shutdown) unless instance.__send__(:shutdown?) else log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) end rescue Exception => e log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end end operation_threads << t end } lifecycle_safe_sequence.call(:stop, :stopped?) # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation lifecycle_unsafe_sequence.call(:shutdown, :shutdown?) lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?) lifecycle_unsafe_sequence.call(:close, :closed?) lifecycle_safe_sequence.call(:terminate, :terminated?) end
start()
click to toggle source
# File lib/fluent/root_agent.rb, line 201 def start lifecycle(desc: true) do |i| # instance i.start unless i.started? # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case # and it causes deadlock for small buffer/queue output. To avoid such problem, # buffer related output threads should be run before `Input#start`. # This is why after_start should be called immediately after start call. # This depends on `desc: true` because calling plugin order of `desc: true` is # Output, Filter, Label, Output with Router, then Input. i.after_start unless i.after_started? end end
suppress_interval(interval_time)
click to toggle source
# File lib/fluent/root_agent.rb, line 306 def suppress_interval(interval_time) @suppress_emit_error_log_interval = interval_time @next_emit_error_log_time = Time.now.to_i end