class Fluent::Plugin::SyslogInput
Constants
- DEFAULT_PARSER
- FACILITY_MAP
- SEVERITY_MAP
- SYSLOG_REGEXP
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_syslog.rb, line 119 def configure(conf) compat_parameters_convert(conf, :parser) super if conf.has_key?('priority_key') log.warn "priority_key is deprecated. Use severity_key instead" end @use_default = false @parser = parser_create @parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority if @include_source_host if @source_address_key raise Fluent::ConfigError, "specify either source_address_key or include_source_host" end @source_address_key = @source_host_key end if @source_hostname_key if @resolve_hostname.nil? @resolve_hostname = true elsif !@resolve_hostname # user specifies "false" in config raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key" end end @_event_loop_run_timeout = @blocking_timeout protocol = @protocol_type || @transport_config.protocol if @send_keepalive_packet && protocol == :udp raise Fluent::ConfigError, "send_keepalive_packet is available for tcp/tls" end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 155 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Input#start
# File lib/fluent/plugin/in_syslog.rb, line 159 def start super log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}" case @protocol_type || @transport_config.protocol when :udp then start_udp_server when :tcp then start_tcp_server when :tls then start_tcp_server(tls: true) else raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}" end end
start_tcp_server(tls: false)
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 178 def start_tcp_server(tls: false) octet_count_frame = @frame_type == :octet_count delimiter = octet_count_frame ? " " : @delimiter delimiter_size = delimiter.size server_create_connection( tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname, send_keepalive_packet: @send_keepalive_packet ) do |conn| conn.data do |data| buffer = conn.buffer buffer << data pos = 0 if octet_count_frame while idx = buffer.index(delimiter, pos) num = Integer(buffer[pos..idx]) msg = buffer[idx + delimiter_size, num] if msg.size != num break end pos = idx + delimiter_size + num message_handler(msg, conn) end else while idx = buffer.index(delimiter, pos) msg = buffer[pos...idx] pos = idx + delimiter_size message_handler(msg, conn) end end buffer.slice!(0, pos) if pos > 0 end end end
start_udp_server()
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 172 def start_udp_server server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_hostname) do |data, sock| message_handler(data.chomp, sock) end end
Private Instance Methods
emit(tag, time, record)
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 270 def emit(tag, time, record) router.emit(tag, time, record) rescue => e log.error "syslog failed to emit", error: e, tag: tag, record: Yajl.dump(record) end
emit_unmatched(data, sock)
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 218 def emit_unmatched(data, sock) record = {"unmatched_line" => data} record[@source_address_key] = sock.remote_addr if @source_address_key record[@source_hostname_key] = sock.remote_host if @source_hostname_key emit("#{@tag}.unmatched", Fluent::EventTime.now, record) end
message_handler(data, sock)
click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 225 def message_handler(data, sock) pri = nil text = data unless @parser_parse_priority m = SYSLOG_REGEXP.match(data) unless m if @emit_unmatched_lines emit_unmatched(data, sock) end log.warn "invalid syslog message: #{data.dump}" return end pri = m[1].to_i text = m[2] end @parser.parse(text) do |time, record| unless time && record if @emit_unmatched_lines emit_unmatched(data, sock) end log.warn "failed to parse message", data: data return end pri ||= record.delete('pri') facility = FACILITY_MAP[pri >> 3] severity = SEVERITY_MAP[pri & 0b111] record[@severity_key] = severity if @severity_key record[@facility_key] = facility if @facility_key record[@source_address_key] = sock.remote_addr if @source_address_key record[@source_hostname_key] = sock.remote_host if @source_hostname_key tag = "#{@tag}.#{facility}.#{severity}" emit(tag, time, record) end rescue => e if @emit_unmatched_lines emit_unmatched(data, sock) end log.error "invalid input", data: data, error: e log.error_backtrace end