class Fluent::Plugin::SecondaryFileOutput
Constants
- PLACEHOLDER_REGEX
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_secondary_file.rb, line 36 def configure(conf) super unless @as_secondary raise Fluent::ConfigError, "This plugin can only be used in the <secondary> section" end if @basename.include?("/") raise Fluent::ConfigError, "basename should not include `/`" end @path_without_suffix = File.join(@directory, @basename) validate_compatible_with_primary_buffer!(@path_without_suffix) @suffix = case @compress when :text "" when :gzip ".gz" end test_path = @path_without_suffix unless Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable" end @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 66 def multi_workers_ready? ### TODO: add hack to synchronize for multi workers true end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 71 def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) path = generate_path(path_without_suffix) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress when :text File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) chunk.write_to(f) } when :gzip File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) gz = Zlib::GzipWriter.new(f) chunk.write_to(gz) gz.close } end path end
Private Instance Methods
generate_path(path_without_suffix)
click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 118 def generate_path(path_without_suffix) if @append "#{path_without_suffix}#{@suffix}" else i = 0 loop do path = "#{path_without_suffix}.#{i}#{@suffix}" return path unless File.exist?(path) i += 1 end end end
has_time_format?(str)
click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 114 def has_time_format?(str) str != Time.now.strftime(str) end
validate_compatible_with_primary_buffer!(path_without_suffix)
click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 96 def validate_compatible_with_primary_buffer!(path_without_suffix) placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+] if !@chunk_key_time && has_time_format?(path_without_suffix) raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory" end if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) }) raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory" end vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') } if ph = vars.find { |v| !@chunk_keys.include?(v) } raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory" end end