class Fluent::Test::Driver::Output
Public Class Methods
new(klass, opts: {}, &block)
click to toggle source
Calls superclass method
Fluent::Test::Driver::EventFeeder::new
# File lib/fluent/test/driver/output.rb, line 29 def initialize(klass, opts: {}, &block) super raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output @flush_buffer_at_cleanup = nil @wait_flush_completion = nil @force_flush_retry = nil @format_hook = nil @format_results = [] end
Public Instance Methods
flush()
click to toggle source
# File lib/fluent/test/driver/output.rb, line 63 def flush @instance.force_flush wait_flush_completion if @wait_flush_completion end
formatted()
click to toggle source
# File lib/fluent/test/driver/output.rb, line 59 def formatted @format_results end
instance_hook_after_started()
click to toggle source
Calls superclass method
Fluent::Test::Driver::Base#instance_hook_after_started
# File lib/fluent/test/driver/output.rb, line 83 def instance_hook_after_started super # it's decided after #start whether output plugin instances use @custom_format or not. if @instance.instance_eval{ @custom_format } @format_hook = format_hook = ->(result){ @format_results << result } m = Module.new do define_method(:format) do |tag, time, record| result = super(tag, time, record) format_hook.call(result) result end end @instance.singleton_class.prepend m end end
run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)
click to toggle source
Calls superclass method
Fluent::Test::Driver::EventFeeder#run
# File lib/fluent/test/driver/output.rb, line 39 def run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block) @flush_buffer_at_cleanup = flush @wait_flush_completion = wait_flush_completion @force_flush_retry = force_flush_retry super(**kwargs, &block) end
run_actual(**kwargs, &block)
click to toggle source
Calls superclass method
Fluent::Test::Driver::Base#run_actual
# File lib/fluent/test/driver/output.rb, line 46 def run_actual(**kwargs, &block) if @force_flush_retry @instance.retry_for_error_chunk = true end val = super(**kwargs, &block) if @flush_buffer_at_cleanup self.flush end val ensure @instance.retry_for_error_chunk = false end
wait_flush_completion()
click to toggle source
# File lib/fluent/test/driver/output.rb, line 68 def wait_flush_completion buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 } dequeued_chunks = ->(){ @instance.dequeued_chunks_mutex && @instance.dequeued_chunks && @instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 } } Timeout.timeout(10) do while buffer_queue.call || dequeued_chunks.call sleep 0.1 end end end