class Fluent::Plugin::Parser::TimeoutChecker

Public Class Methods

new(timeout) click to toggle source

This implementation now uses mutex because parser is typically used in input. If this has a performance issue under high concurreny, use concurrent-ruby's map instead.

# File lib/fluent/plugin/parser.rb, line 33
def initialize(timeout)
  @map = {}
  @flag = ServerEngine::BlockingFlag.new
  @mutex = Mutex.new
  @timeout = timeout
  @timeout_checker = nil
end

Public Instance Methods

execute() { || ... } click to toggle source
# File lib/fluent/plugin/parser.rb, line 63
def execute
  th = Thread.current
  @mutex.synchronize { @map[th] = Time.now }
  yield
ensure
  # Need clean up here because if next event is delayed, incorrect exception will be raised in normal flow.
  @mutex.synchronize { @map.delete(th) }
end
start() click to toggle source
# File lib/fluent/plugin/parser.rb, line 41
def start
  @thread = ::Thread.new {
    until @flag.wait_for_set(0.5)
      now = Time.now
      @mutex.synchronize {
        @map.keys.each { |th|
          time = @map[th]
          if now - time > @timeout
            th.raise UncatchableError, "parsing timed out"
            @map.delete(th)
          end
        }
      }
    end
  }
end
stop() click to toggle source
# File lib/fluent/plugin/parser.rb, line 58
def stop
  @flag.set!
  @thread.join
end