class Fluent::Plugin::ExecInput
Attributes
parser[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_exec.rb, line 51 def configure(conf) compat_parameters_convert(conf, :extract, :parser) ['parse', 'extract'].each do |subsection_name| if subsection = conf.elements(subsection_name).first if subsection.has_key?('time_format') subsection['time_type'] ||= 'string' end end end super if !@tag && (!@extract_config || !@extract_config.tag_key) raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input" end @parser = parser_create end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 69 def multi_workers_ready? true end
on_record(time, record)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 100 def on_record(time, record) tag = extract_tag_from_record(record) tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e router.emit_error_event(tag, time, record, e) if tag && time && record end
run(io)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 83 def run(io) case when @parser.implement?(:parse_io) @parser.parse_io(io, &method(:on_record)) when @parser.implement?(:parse_partial_data) until io.eof? @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) end when @parser.parser_type == :text_per_line io.each_line do |line| @parser.parse(line.chomp, &method(:on_record)) end else @parser.parse(io.read, &method(:on_record)) end end
start()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#start
# File lib/fluent/plugin/in_exec.rb, line 73 def start super if @run_interval child_process_execute(:exec_input, @command, interval: @run_interval, mode: [@connect_mode], &method(:run)) else child_process_execute(:exec_input, @command, immediate: true, mode: [@connect_mode], &method(:run)) end end