class Fluent::Plugin::UnixInput

TODO: This plugin will be 3rd party plugin

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input::new
# File lib/fluent/plugin/in_unix.rb, line 33
def initialize
  super

  @lsock = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_unix.rb, line 46
def configure(conf)
  super
end
convert_time(time) click to toggle source
# File lib/fluent/plugin/in_unix.rb, line 135
def convert_time(time)
  case time
  when nil, 0
    Fluent::EventTime.now
  when Fluent::EventTime
    time
  else
    Fluent::EventTime.from_time(Time.at(time))
  end
end
listen() click to toggle source
# File lib/fluent/plugin/in_unix.rb, line 66
def listen
  if File.exist?(@path)
    log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin"
    File.unlink(@path)
  end
  FileUtils.mkdir_p(File.dirname(@path))

  log.info "listening fluent socket on #{@path}"
  s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
  s.listen(@backlog) unless @backlog.nil?
  s
end
on_message(msg) click to toggle source

message Entry {

1: long time
2: object record

}

message Forward {

1: string tag
2: list<Entry> entries

}

message PackedForward {

1: string tag
2: raw entries  # msgpack stream of Entry

}

message Message {

1: string tag
2: long? time
3: object record

}

# File lib/fluent/plugin/in_unix.rb, line 99
def on_message(msg)
  unless msg.is_a?(Array)
    log.warn "incoming data is broken:", msg: msg
    return
  end

  tag = @tag || (msg[0].to_s)
  entries = msg[1]

  case entries
  when String
    # PackedForward
    es = Fluent::MessagePackEventStream.new(entries)
    router.emit_stream(tag, es)

  when Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      record = e[1]
      next if record.nil?
      time = convert_time(e[0])
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    record = msg[2]
    return if record.nil?

    time = convert_time(msg[1])
    router.emit(tag, time, record)
  end
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_unix.rb, line 57
def shutdown
  if @lsock
    event_loop_detach(@lsock)
    @lsock.close
  end

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_unix.rb, line 50
def start
  super

  @lsock = listen
  event_loop_attach(@lsock)
end