class Fluent::Plugin::FileSingleBuffer

Constants

DEFAULT_CHUNK_LIMIT_SIZE
DEFAULT_TOTAL_LIMIT_SIZE
PATH_SUFFIX

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::new
# File lib/fluent/plugin/buf_file_single.rb, line 51
def initialize
  super

  @multi_workers_available = false
  @additional_resume_path = nil
  @variable_store = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Buffer#configure
# File lib/fluent/plugin/buf_file_single.rb, line 59
def configure(conf)
  super

  @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file_single)

  if @chunk_format == :auto
    @chunk_format = owner.formatted_to_msgpack_binary? ? :msgpack : :text
  end

  @key_in_path = nil
  if owner.chunk_keys.empty?
    log.debug "use event tag for buffer key"
  else
    if owner.chunk_key_tag
      raise Fluent::ConfigError, "chunk keys must be tag or one field"
    elsif owner.chunk_keys.size > 1
      raise Fluent::ConfigError, "2 or more chunk keys is not allowed"
    else
      @key_in_path = owner.chunk_keys.first.to_sym
    end
  end

  multi_workers_configured = owner.system_config.workers > 1
  using_plugin_root_dir = false
  unless @path
    if root_dir = owner.plugin_root_dir
      @path = File.join(root_dir, 'buffer')
      using_plugin_root_dir = true # plugin_root_dir path contains worker id
    else
      raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
    end
  end

  specified_directory_exists = File.exist?(@path) && File.directory?(@path)
  unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*')

  if specified_directory_exists || unexisting_path_for_directory # directory
    if using_plugin_root_dir || !multi_workers_configured
      @path = File.join(@path, "fsb.*#{PATH_SUFFIX}")
    else
      @path = File.join(@path, "worker#{fluentd_worker_id}", "fsb.*#{PATH_SUFFIX}")
      if fluentd_worker_id == 0
        # worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
        @additional_resume_path = File.join(File.expand_path("../../", @path), "fsb.*#{PATH_SUFFIX}")
      end
    end
    @multi_workers_available = true
  else # specified path is file path
    if File.basename(@path).include?('.*.')
      new_path = File.join(File.dirname(@path), "fsb.*#{PATH_SUFFIX}")
      log.warn "file_single doesn't allow user specified 'prefix.*.suffix' style path. Use '#{new_path}' for file instead: #{@path}"
      @path = new_path
    elsif File.basename(@path).end_with?('.*')
      @path = @path + PATH_SUFFIX
    else
      # existing file will be ignored
      @path = @path + ".*#{PATH_SUFFIX}"
    end
    @multi_workers_available = false
  end

  type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
  if @variable_store.has_key?(@path) && !called_in_test?
    type_using_this_path = @variable_store[@path]
    raise Fluent::ConfigError, "Other '#{type_using_this_path}' plugin already uses same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
  end

  @variable_store[@path] = type_of_owner
  @dir_permission = if @dir_permission
                      @dir_permission.to_i(8)
                    else
                      system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
                    end
end
generate_chunk(metadata) click to toggle source
# File lib/fluent/plugin/buf_file_single.rb, line 199
def generate_chunk(metadata)
  # FileChunk generates real path with unique_id
  perm = @file_permission || system_config.file_permission
  chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: perm, compress: @compress)

  log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

  chunk
end
handle_broken_files(path, mode, e) click to toggle source
# File lib/fluent/plugin/buf_file_single.rb, line 209
def handle_broken_files(path, mode, e)
  log.error "found broken chunk file during resume. Delete corresponding files:", path: path, mode: mode, err_msg: e.message
  # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
  File.unlink(path) rescue nil
end
multi_workers_ready?() click to toggle source

This method is called only when multi worker is configured

# File lib/fluent/plugin/buf_file_single.rb, line 135
def multi_workers_ready?
  unless @multi_workers_available
    log.error "file_single buffer with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end
persistent?() click to toggle source
# File lib/fluent/plugin/buf_file_single.rb, line 156
def persistent?
  true
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file_single.rb, line 160
def resume
  stage = {}
  queue = []

  patterns = [@path]
  patterns.unshift @additional_resume_path if @additional_resume_path
  Dir.glob(escaped_patterns(patterns)) do |path|
    next unless File.file?(path)

    log.debug { "restoring buffer file: path = #{path}" }

    m = new_metadata() # this metadata will be updated in FileSingleChunk.new
    mode = Fluent::Plugin::Buffer::FileSingleChunk.assume_chunk_state(path)
    if mode == :unknown
      log.debug "unknown state chunk found", path: path
      next
    end

    begin
      chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress)
      chunk.restore_size(@chunk_format) if @calc_num_records
    rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e
      handle_broken_files(path, mode, e)
      next
    end

    case chunk.state
    when :staged
      stage[chunk.metadata] = chunk
    when :queued
      queue << chunk
    end
  end

  queue.sort_by!(&:modified_at)

  return stage, queue
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Buffer#start
# File lib/fluent/plugin/buf_file_single.rb, line 142
def start
  FileUtils.mkdir_p(File.dirname(@path), mode: @dir_permission)

  super
end
stop() click to toggle source
Calls superclass method Fluent::PluginId#stop
# File lib/fluent/plugin/buf_file_single.rb, line 148
def stop
  if @variable_store
    @variable_store.delete(@path)
  end

  super
end

Private Instance Methods

escaped_patterns(patterns) click to toggle source
# File lib/fluent/plugin/buf_file_single.rb, line 217
def escaped_patterns(patterns)
  patterns.map { |pattern|
    # '{' '}' are special character in Dir.glob
    pattern.gsub(/[\{\}]/) { |c| "\\#{c}" }
  }
end