class Fluent::Plugin::TailInput
Constants
- Entry
- MetricsInfo
- RESERVED_CHARS
- TargetInfo
Attributes
Public Class Methods
Fluent::Plugin::Input::new
# File lib/fluent/plugin/in_tail.rb, line 53 def initialize super @paths = [] @tails = {} @pf_file = nil @pf = nil @ignore_list = [] @shutdown_start_time = nil @metrics = nil @startup = true end
Public Instance Methods
Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_tail.rb, line 277 def close super # close file handles after all threads stopped (in #close of thread plugin helper) close_watcher_handles end
# File lib/fluent/plugin/in_tail.rb, line 480 def close_watcher_handles @tails.keys.each do |target_info| tw = @tails.delete(target_info) if tw tw.close end end end
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_tail.rb, line 125 def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config raise Fluent::ConfigError, "<parse> section is required." end (1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n| parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"] end parser_config['unmatched_lines'] = conf['emit_unmatched_lines'] super if !@enable_watch_timer && !@enable_stat_watcher raise Fluent::ConfigError, "either of enable_watch_timer or enable_stat_watcher must be true" end if RESERVED_CHARS.include?(@path_delimiter) rc = RESERVED_CHARS.join(', ') raise Fluent::ConfigError, "#{rc} are reserved words: #{@path_delimiter}" end @paths = @path.split(@path_delimiter).map(&:strip).uniq if @paths.empty? raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input" end if @path_timezone Fluent::Timezone.validate!(@path_timezone) @path_formatters = @paths.map{|path| [path, Fluent::Timezone.formatter(@path_timezone, path)]}.to_h @exclude_path_formatters = @exclude_path.map{|path| [path, Fluent::Timezone.formatter(@path_timezone, path)]}.to_h end # TODO: Use plugin_root_dir and storage plugin to store positions if available if @pos_file if @variable_store.key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @variable_store[@pos_file] = self.plugin_id else if @follow_inodes raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter" end $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_tag configure_encoding @multiline_mode = parser_config["@type"] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else method(:parse_singleline) end @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION # parser is already created by parser helper @parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage) @capability = Fluent::Capability.new(:current_process) if @read_bytes_limit_per_second > 0 if !@enable_watch_timer raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" end min_bytes = TailWatcher::IOHandler::BYTES_TO_READ if @read_bytes_limit_per_second < min_bytes log.warn "Should specify greater equal than #{min_bytes}. Use #{min_bytes} for read_bytes_limit_per_second" @read_bytes_limit_per_second = min_bytes end end opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") @metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end
# File lib/fluent/plugin/in_tail.rb, line 216 def configure_encoding unless @encoding if @from_encoding raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." end end @encoding = parse_encoding_param(@encoding) if @encoding @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding if @encoding && (@encoding == @from_encoding) log.warn "'encoding' and 'from_encoding' are same encoding. No effect" end end
# File lib/fluent/plugin/in_tail.rb, line 205 def configure_tag if @tag.index('*') @tag_prefix, @tag_suffix = @tag.split('*') @tag_prefix ||= '' @tag_suffix ||= '' else @tag_prefix = nil @tag_suffix = nil end end
# File lib/fluent/plugin/in_tail.rb, line 422 def construct_watcher(target_info) pe = nil if @pf pe = @pf[target_info] if @read_from_head && pe.read_inode.zero? begin pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) rescue Errno::ENOENT, Errno::EACCES $log.warn "stat() for #{target_info.path} failed. Continuing without tailing it." end end end begin tw = setup_watcher(target_info, pe) rescue WatcherSetupError => e log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" return end begin target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) @tails.delete(target_info) @tails[target_info] = tw tw.on_notify rescue Errno::ENOENT, Errno::EACCES => e $log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now." # explicitly detach and unwatch watcher `tw`. tw.unwatched = true detach_watcher(tw, target_info.ino, false) end end
# File lib/fluent/plugin/in_tail.rb, line 627 def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else if @emit_unmatched_lines record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(Fluent::EventTime.now, record) end log.warn "pattern not matched: #{line.inspect}" end } rescue => e log.warn 'invalid line found', file: tail_watcher.path, line: line, error: e.to_s log.debug_backtrace(e.backtrace) end end
TailWatcher#close
is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop
# File lib/fluent/plugin/in_tail.rb, line 531 def detach_watcher(tw, ino, close_io = true) tw.watchers.each do |watcher| event_loop_detach(watcher) end tw.detach(@shutdown_start_time) tw.close if close_io if tw.unwatched && @pf target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end end
# File lib/fluent/plugin/in_tail.rb, line 545 def detach_watcher_after_rotate_wait(tw, ino) # Call event_loop_attach/event_loop_detach is high-cost for short-live object. # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) elsif @read_bytes_limit_per_second < 0 # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) end else # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do elapsed = Fluent::Clock.now - start_time_to_wait if tw.eof? && elapsed >= @rotate_wait timer.detach detach_watcher(tw, ino) end end end end
# File lib/fluent/plugin/in_tail.rb, line 355 def existence_path hash = {} @tails.each_key {|target_info| if @follow_inodes hash[target_info.ino] = target_info else hash[target_info.path] = target_info end } hash end
# File lib/fluent/plugin/in_tail.rb, line 288 def expand_paths date = Fluent::EventTime.now paths = [] @paths.each { |path| path = if @path_timezone @path_formatters[path].call(date) else date.to_time.strftime(path) end if path.include?('*') paths += Dir.glob(path).select { |p| begin is_file = !File.directory?(p) if (File.readable?(p) || have_read_capability?) && is_file if @limit_recently_modified && File.mtime(p) < (date.to_time - @limit_recently_modified) false else true end else if is_file unless @ignore_list.include?(p) log.warn "#{p} unreadable. It is excluded and would be examined next time." @ignore_list << p if @ignore_repeated_permission_error end end false end rescue Errno::ENOENT, Errno::EACCES log.debug("#{p} is missing after refresh file list") false end } else # When file is not created yet, Dir.glob returns an empty array. So just add when path is static. paths << path end } excluded = @exclude_path.map { |path| path = if @path_timezone @exclude_path_formatters[path].call(date) else date.to_time.strftime(path) end path.include?('*') ? Dir.glob(path) : path }.flatten.uniq # filter out non existing files, so in case pattern is without '*' we don't do unnecessary work hash = {} (paths - excluded).select { |path| FileTest.exist?(path) }.each { |path| # Even we just checked for existence, there is a race condition here as # of which stat() might fail with ENOENT. See #3224. begin target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino) if @follow_inodes hash[target_info.ino] = target_info else hash[target_info.path] = target_info end rescue Errno::ENOENT, Errno::EACCES => e $log.warn "expand_paths: stat() for #{path} failed with #{e.class.name}. Skip file." end } hash end
# File lib/fluent/plugin/in_tail.rb, line 570 def flush_buffer(tw, buf) buf.chomp! @parser.parse(buf) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end record[@path_key] ||= tw.path unless @path_key.nil? router.emit(tag, time, record) else if @emit_unmatched_lines record = { 'unmatched_line' => buf } record[@path_key] ||= tail_watcher.path unless @path_key.nil? tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end router.emit(tag, Fluent::EventTime.now, record) end log.warn "got incomplete line at shutdown from #{tw.path}: #{buf.inspect}" end } end
# File lib/fluent/plugin/in_tail.rb, line 283 def have_read_capability? @capability.have_capability?(:effective, :dac_read_search) || @capability.have_capability?(:effective, :dac_override) end
# File lib/fluent/plugin/in_tail.rb, line 230 def parse_encoding_param(encoding_name) begin Encoding.find(encoding_name) if encoding_name rescue ArgumentError => e raise Fluent::ConfigError, e.message end end
No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist
# File lib/fluent/plugin/in_tail.rb, line 658 def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer_timer_flusher.line_buffer es = Fluent::MultiEventStream.new if @parser.has_firstline? tail_watcher.line_buffer_timer_flusher.reset_timer lines.each { |line| if @parser.firstline?(line) if lb convert_line_to_event(lb, es, tail_watcher) end lb = line else if lb.nil? if @emit_unmatched_lines convert_line_to_event(line, es, tail_watcher) end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" else lb << line end end } else lb ||= '' lines.each do |line| lb << line @parser.parse(lb) { |time, record| if time && record convert_line_to_event(lb, es, tail_watcher) lb = '' end } end end tail_watcher.line_buffer_timer_flusher.line_buffer = lb es end
# File lib/fluent/plugin/in_tail.rb, line 649 def parse_singleline(lines, tail_watcher) es = Fluent::MultiEventStream.new lines.each { |line| convert_line_to_event(line, es, tail_watcher) } es end
@return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
# File lib/fluent/plugin/in_tail.rb, line 598 def receive_lines(lines, tail_watcher) lines = lines.reject do |line| skip_line = @max_line_size ? line.bytesize > @max_line_size : false if skip_line log.warn "received line length is longer than #{@max_line_size}" log.debug "skipped line: #{line.chomp}" end skip_line end es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @tag_prefix + tail_watcher.tag + @tag_suffix else @tag end begin router.emit_stream(tag, es) rescue Fluent::Plugin::Buffer::BufferOverflowError return false rescue # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces. return true end end return true end
in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
# File lib/fluent/plugin/in_tail.rb, line 372 def refresh_watchers target_paths_hash = expand_paths existence_paths_hash = existence_path log.debug { target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",") existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",") "tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}" } unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)} added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)} stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty? start_watchers(added_hash) unless added_hash.empty? @startup = false if @startup end
# File lib/fluent/plugin/in_tail.rb, line 390 def setup_watcher(target_info, pe) line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil read_from_head = !@startup || @read_from_head tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics) if @enable_watch_timer tt = TimerTrigger.new(1, log) { tw.on_notify } tw.register_watcher(tt) end if @enable_stat_watcher tt = StatWatcher.new(target_info.path, log) { tw.on_notify } tw.register_watcher(tt) end tw.watchers.each do |watcher| event_loop_attach(watcher) end tw rescue => e if tw tw.watchers.each do |watcher| event_loop_detach(watcher) end tw.detach(@shutdown_start_time) tw.close end raise e end
Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/in_tail.rb, line 268 def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) @pf_file.close if @pf_file super end
Fluent::Plugin::Base#start
# File lib/fluent/plugin/in_tail.rb, line 238 def start super if @pos_file pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir, mode: @dir_perm) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @pf = PositionFile.load(@pf_file, @follow_inodes, expand_paths, logger: log) if @pos_file_compaction_interval timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do log.info('Clean up the pos file') @pf.try_compact end end end refresh_watchers unless @skip_refresh_on_startup timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end
# File lib/fluent/plugin/in_tail.rb, line 455 def start_watchers(targets_info) targets_info.each_value {|target_info| construct_watcher(target_info) break if before_shutdown? } end
Fluent::Plugin::Input#statistics
# File lib/fluent/plugin/in_tail.rb, line 696 def statistics stats = super stats = { 'input' => stats["input"].merge({ 'opened_file_count' => @metrics.opened.get, 'closed_file_count' => @metrics.closed.get, 'rotated_file_count' => @metrics.rotated.get, }) } stats end
Fluent::PluginId#stop
# File lib/fluent/plugin/in_tail.rb, line 260 def stop if @variable_store @variable_store.delete(@pos_file) end super end
# File lib/fluent/plugin/in_tail.rb, line 462 def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| if remove_watcher tw = @tails.delete(target_info) else tw = @tails[target_info] end if tw tw.unwatched = unwatched if immediate detach_watcher(tw, target_info.ino, false) else detach_watcher_after_rotate_wait(tw, target_info.ino) end end } end
refresh_watchers
calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
# File lib/fluent/plugin/in_tail.rb, line 490 def update_watcher(target_info, pe) log.info("detected rotation of #{target_info.path}; waiting #{@rotate_wait} seconds") if @pf pe_inode = pe.read_inode target_info_from_position_entry = TargetInfo.new(target_info.path, pe_inode) unless pe_inode == @pf[target_info_from_position_entry].read_inode log.debug "Skip update_watcher because watcher has been already updated by other inotify event" return end end rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode) rotated_tw = @tails[rotated_target_info] new_target_info = target_info.dup if @follow_inodes new_position_entry = @pf[target_info] if new_position_entry.read_inode == 0 # When follow_inodes is true, it's not cleaned up by refresh_watcher. # So it should be unwatched here explicitly. rotated_tw.unwatched = true # Make sure to delete old key, it has a different ino while the hash key is same. @tails.delete(rotated_target_info) @tails[new_target_info] = setup_watcher(new_target_info, new_position_entry) @tails[new_target_info].on_notify end else # Make sure to delete old key, it has a different ino while the hash key is same. @tails.delete(rotated_target_info) @tails[new_target_info] = setup_watcher(new_target_info, pe) @tails[new_target_info].on_notify end detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw end
Private Instance Methods
# File lib/fluent/plugin/in_tail.rb, line 711 def io_handler(watcher, path) TailWatcher::IOHandler.new( watcher, path: path, log: log, read_lines_limit: @read_lines_limit, read_bytes_limit_per_second: @read_bytes_limit_per_second, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding, metrics: @metrics, &method(:receive_lines) ) end