class Fluent::Plugin::Parser::TimeoutChecker
Public Class Methods
new(timeout)
click to toggle source
This implementation now uses mutex because parser is typically used in input. If this has a performance issue under high concurreny, use concurrent-ruby's map instead.
# File lib/fluent/plugin/parser.rb, line 33 def initialize(timeout) @map = {} @flag = ServerEngine::BlockingFlag.new @mutex = Mutex.new @timeout = timeout @timeout_checker = nil end
Public Instance Methods
execute() { || ... }
click to toggle source
# File lib/fluent/plugin/parser.rb, line 63 def execute th = Thread.current @mutex.synchronize { @map[th] = Time.now } yield ensure # Need clean up here because if next event is delayed, incorrect exception will be raised in normal flow. @mutex.synchronize { @map.delete(th) } end
start()
click to toggle source
# File lib/fluent/plugin/parser.rb, line 41 def start @thread = ::Thread.new { until @flag.wait_for_set(0.5) now = Time.now @mutex.synchronize { @map.keys.each { |th| time = @map[th] if now - time > @timeout th.raise UncatchableError, "parsing timed out" @map.delete(th) end } } end } end
stop()
click to toggle source
# File lib/fluent/plugin/parser.rb, line 58 def stop @flag.set! @thread.join end