class Fluent::EventRouter

EventRouter is responsible to route events to a collector.

It has a list of MatchPattern and Collector pairs:

+----------------+     +-----------------+
|  MatchPattern  |     |    Collector    |
+----------------+     +-----------------+
|   access.**  ---------> type forward   |
|     logs.**  ---------> type copy      |
|  archive.**  ---------> type s3        |
+----------------+     +-----------------+

EventRouter does:

1) receive an event at `#emit` methods 2) match the event's tag with the MatchPatterns 3) forward the event to the corresponding Collector

Collector is either of Output, Filter or other EventRouter.

Attributes

default_collector[RW]
emit_error_handler[RW]

Public Class Methods

new(default_collector, emit_error_handler) click to toggle source
# File lib/fluent/event_router.rb, line 45
def initialize(default_collector, emit_error_handler)
  @match_rules = []
  @match_cache = MatchCache.new
  @default_collector = default_collector
  @emit_error_handler = emit_error_handler
  @metric_callbacks = {}
  @caller_plugin_id = nil
end

Public Instance Methods

add_metric_callbacks(caller_plugin_id, callback) click to toggle source
# File lib/fluent/event_router.rb, line 88
def add_metric_callbacks(caller_plugin_id, callback)
  @metric_callbacks[caller_plugin_id] = callback
end
add_rule(pattern, collector) click to toggle source

called by Agent to add new match pattern and collector

# File lib/fluent/event_router.rb, line 84
def add_rule(pattern, collector)
  @match_rules << Rule.new(pattern, collector)
end
caller_plugin_id=(caller_plugin_id) click to toggle source
# File lib/fluent/event_router.rb, line 92
def caller_plugin_id=(caller_plugin_id)
  @caller_plugin_id = caller_plugin_id
end
emit(tag, time, record) click to toggle source
# File lib/fluent/event_router.rb, line 104
def emit(tag, time, record)
  unless record.nil?
    emit_stream(tag, OneEventStream.new(time, record))
  end
end
emit_array(tag, array) click to toggle source
# File lib/fluent/event_router.rb, line 110
def emit_array(tag, array)
  emit_stream(tag, ArrayEventStream.new(array))
end
emit_error_event(tag, time, record, error) click to toggle source
# File lib/fluent/event_router.rb, line 123
def emit_error_event(tag, time, record, error)
  @emit_error_handler.emit_error_event(tag, time, record, error)
end
emit_stream(tag, es) click to toggle source
# File lib/fluent/event_router.rb, line 114
def emit_stream(tag, es)
  match(tag).emit_events(tag, es)
  if callback = find_callback
    callback.call(es)
  end
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end
find_callback() click to toggle source
# File lib/fluent/event_router.rb, line 96
def find_callback
  if @caller_plugin_id
    @metric_callbacks[@caller_plugin_id]
  else
    nil
  end
end
match(tag) click to toggle source
# File lib/fluent/event_router.rb, line 131
def match(tag)
  collector = @match_cache.get(tag) {
    find(tag) || @default_collector
  }
  collector
end
match?(tag) click to toggle source
# File lib/fluent/event_router.rb, line 127
def match?(tag)
  !!find(tag)
end
suppress_missing_match!() click to toggle source
# File lib/fluent/event_router.rb, line 77
def suppress_missing_match!
  if @default_collector.respond_to?(:suppress_missing_match!)
    @default_collector.suppress_missing_match!
  end
end

Private Instance Methods

find(tag) click to toggle source
# File lib/fluent/event_router.rb, line 269
def find(tag)
  pipeline = nil
  @match_rules.each_with_index { |rule, i|
    if rule.match?(tag)
      if rule.collector.is_a?(Plugin::Filter)
        pipeline ||= Pipeline.new
        pipeline.add_filter(rule.collector)
      else
        if pipeline
          pipeline.set_output(rule.collector)
        else
          # Use Output directly when filter is not matched
          pipeline = rule.collector
        end
        return pipeline
      end
    end
  }

  if pipeline
    # filter is matched but no match
    pipeline.set_output(@default_collector)
    pipeline
  else
    nil
  end
end