class Fluent::Plugin::TailInput::TailWatcher

Attributes

ino[R]
line_buffer_timer_flusher[R]
path[R]
pe[R]
unwatched[RW]
watchers[R]

Public Class Methods

new(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 757
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 795
def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end
detach(shutdown_start_time = nil) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 787
def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end
eof?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 802
def eof?
  @io_handler.nil? || @io_handler.eof?
end
io_handler() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 900
def io_handler
  @io_handler_build.call(self, @path)
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 806
def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end
on_rotate(stat) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 819
def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # No need to update a watcher if stat is nil (file not present), because moving to inodes will create
        # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
        # don't want to swap state because we need latest read offset in pos file even after rotate_wait
        if stat
          target_info = TargetInfo.new(@path, stat.ino)
          @update_watcher.call(target_info, @pe)
        end
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        target_info = TargetInfo.new(@path, stat ? stat.ino : nil)
        @update_watcher.call(target_info, swap_state(@pe))
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end
register_watcher(watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 783
def register_watcher(watcher)
  @watchers << watcher
end
swap_state(pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 904
def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 779
def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end