class Fluent::Plugin::ExecFilterOutput

Constants

COMPAT_EXTRACT_PARAMS
COMPAT_FORMAT_PARAMS
COMPAT_INJECT_PARAMS
COMPAT_PARSE_PARAMS
ExecutedProcess
KEYS_FOR_IN_AND_OUT
NEWLINE

Attributes

formatter[R]
parser[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_exec_filter.rb, line 130
def configure(conf)
  exec_filter_compat_parameters_convert!(conf)
  compat_parameters_convert(conf, :buffer)

  if inject_section = conf.elements('inject').first
    if inject_section.has_key?('time_format')
      inject_section['time_type'] ||= 'string'
    end
  end
  if extract_section = conf.elements('extract').first
    if extract_section.has_key?('time_format')
      extract_section['time_type'] ||= 'string'
    end
  end

  super

  if !@tag && (!@extract_config || !@extract_config.tag_key)
    raise Fluent::ConfigError, "'tag' or '<extract> tag_key </extract>' option is required on exec_filter output"
  end

  @formatter = formatter_create
  @parser = parser_create

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
  if @add_prefix
    @added_prefix_string = @add_prefix + '.'
  end

  @respawns = if @child_respawn.nil? || (@child_respawn == 'none') || (@child_respawn == '0')
                0
              elsif (@child_respawn == 'inf') || (@child_respawn == '-1')
                -1
              elsif @child_respawn =~ /^\d+$/
                @child_respawn.to_i
              else
                raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number"
              end

  @suppress_error_log_interval ||= 0
  @next_log_time = Time.now.to_i
end
exec_filter_compat_parameters_convert!(conf) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 116
def exec_filter_compat_parameters_convert!(conf)
  KEYS_FOR_IN_AND_OUT.each_pair do |inout, keys|
    if conf.has_key?(inout)
      keys.each do |k|
        conf[k] = conf[inout]
      end
    end
  end
  exec_filter_compat_parameters_copy_to_subsection!(conf, 'inject', COMPAT_INJECT_PARAMS)
  exec_filter_compat_parameters_copy_to_subsection!(conf, 'format', COMPAT_FORMAT_PARAMS)
  exec_filter_compat_parameters_copy_to_subsection!(conf, 'parse', COMPAT_PARSE_PARAMS)
  exec_filter_compat_parameters_copy_to_subsection!(conf, 'extract', COMPAT_EXTRACT_PARAMS)
end
exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 106
def exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params)
  return unless conf.elements(subsection_name).empty?
  return unless params.keys.any?{|k| conf.has_key?(k) }
  hash = {}
  params.each_pair do |compat, current|
    hash[current] = conf[compat] if conf.has_key?(compat)
  end
  conf.elements << Fluent::Config::Element.new(subsection_name, '', hash, [])
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 263
def format(tag, time, record)
  tag = tag_remove_prefix(tag)
  record = inject_values_to_record(tag, time, record)
  if @formatter.formatter_type == :text_per_line
    @formatter.format(tag, time, record).chomp + NEWLINE
  else
    @formatter.format(tag, time, record)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 176
def multi_workers_ready?
  true
end
on_record(time, record) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 304
def on_record(time, record)
  tag = extract_tag_from_record(record)
  tag = @added_prefix_string + tag if tag && @add_prefix
  tag ||= @tag
  time ||= extract_time_from_record(record) || Fluent::EventTime.now
  router.emit(tag, time, record)
rescue => e
  if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time
    log.error "exec_filter failed to emit", record: Yajl.dump(record), error: e
    log.error_backtrace e.backtrace
    @next_log_time = Time.now.to_i + @suppress_error_log_interval
  end
  router.emit_error_event(tag, time, record, e) if tag && time && record
end
run(io) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 286
def run(io)
  io.set_encoding(Encoding::ASCII_8BIT)
  case
  when @parser.implement?(:parse_io)
    @parser.parse_io(io, &method(:on_record))
  when @parser.implement?(:parse_partial_data)
    until io.eof?
      @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
    end
  when @parser.parser_type == :text_per_line
    io.each_line do |line|
      @parser.parse(line.chomp, &method(:on_record))
    end
  else
    @parser.parse(io.read, &method(:on_record))
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Output#start
# File lib/fluent/plugin/out_exec_filter.rb, line 182
def start
  super

  @children_mutex = Mutex.new
  @children = []
  @rr = 0

  exit_callback = ->(status){
    c = @children.select{|child| child.pid == status.pid }.first
    if c
      unless self.stopped?
        log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig
      end
      c.mutex.synchronize do
        (c.writeio && c.writeio.close) rescue nil
        (c.readio && c.readio.close) rescue nil
        c.pid = c.readio = c.writeio = nil
      end
    end
  }
  child_process_callback = ->(index, readio, writeio){
    pid = child_process_id
    c = @children[index]
    writeio.sync = true
    c.mutex.synchronize do
      c.pid = pid
      c.respawns = @respawns
      c.readio = readio
      c.writeio = writeio
    end

    run(readio)
  }
  execute_child_process = ->(index){
    child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio|
      child_process_callback.call(index, readio, writeio)
    end
  }

  @children_mutex.synchronize do
    @num_children.times do |i|
      @children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil)
      execute_child_process.call(i)
    end
  end

  if @respawns != 0
    thread_create(:out_exec_filter_respawn_monitor) do
      while thread_current_running?
        @children.each_with_index do |c, i|
          if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 }
            respawns = c.mutex.synchronize do
              c.respawns -= 1 if c.respawns > 0
              c.respawns
            end
            log.info "respawning child process", num: i, respawn_counter: respawns
            execute_child_process.call(i)
          end
        end
        sleep 0.2
      end
    end
  end
end
tag_remove_prefix(tag) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 252
def tag_remove_prefix(tag)
  if @remove_prefix
    if ((tag[0, @removed_length] == @removed_prefix_string) && (tag.length > @removed_length)) || (tag == @removed_prefix_string)
      tag = tag[@removed_length..-1] || ''
    end
  end
  tag
end
terminate() click to toggle source
Calls superclass method Fluent::Plugin::Output#terminate
# File lib/fluent/plugin/out_exec_filter.rb, line 247
def terminate
  @children = []
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_exec_filter.rb, line 273
def write(chunk)
  try_times = 0
  while true
    r = @rr = (@rr + 1) % @children.length
    if @children[r].pid && writeio = @children[r].writeio
      chunk.write_to(writeio)
      break
    end
    try_times += 1
    raise "no healthy child processes exist" if try_times >= @children.length
  end
end