module Fluent::PluginHelper::Inject

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin_helper/inject.rb, line 87
def self.included(mod)
  mod.include InjectParams
end
new() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/inject.rb, line 91
def initialize
  super
  @_inject_enabled = false
  @_inject_hostname_key = nil
  @_inject_hostname = nil
  @_inject_worker_id_key = nil
  @_inject_worker_id = nil
  @_inject_tag_key = nil
  @_inject_time_key = nil
  @_inject_time_formatter = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/inject.rb, line 103
def configure(conf)
  super

  if @inject_config
    @_inject_hostname_key = @inject_config.hostname_key
    if @_inject_hostname_key
      if self.respond_to?(:buffer_config)
        # Output plugin cannot use "hostname"(specified by @hostname_key),
        # injected by this plugin helper, in chunk keys.
        # This plugin helper works in `#format` (in many cases), but modified record
        # don't have any side effect in chunking of output plugin.
        if self.buffer_config.chunk_keys.include?(@_inject_hostname_key)
          log.error "Use filters to inject hostname to use it in buffer chunking."
          raise Fluent::ConfigError, "the key specified by 'hostname_key' in <inject> cannot be used in buffering chunk key."
        end
      end

      @_inject_hostname =  @inject_config.hostname
      unless @_inject_hostname
        @_inject_hostname = ::Socket.gethostname
        log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname
      end
    end
    @_inject_worker_id_key = @inject_config.worker_id_key
    if @_inject_worker_id_key
      @_inject_worker_id = fluentd_worker_id # get id here, because #with_worker_config method may be used only for #configure in tests
    end
    @_inject_tag_key = @inject_config.tag_key
    @_inject_time_key = @inject_config.time_key
    if @_inject_time_key
      @_inject_time_formatter = case @inject_config.time_type
                                when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
                                when :unixtime_millis then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000 + time.nsec / 1_000_000 : (time * 1_000).floor }
                                when :unixtime_micros then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000 + time.nsec / 1_000 : (time * 1_000_000).floor }
                                when :unixtime_nanos then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000_000 + time.nsec : (time * 1_000_000_000).floor }
                                when :unixtime then ->(time){ time.to_i }
                                else
                                  localtime = @inject_config.localtime && !@inject_config.utc
                                  Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone)
                                end
    else
      if @inject_config.time_format
        log.warn "'time_format' specified without 'time_key', will be ignored"
      end
    end

    @_inject_enabled = @_inject_hostname_key || @_inject_worker_id_key || @_inject_tag_key || @_inject_time_key
  end
end
inject_values_to_event_stream(tag, es) click to toggle source
# File lib/fluent/plugin_helper/inject.rb, line 45
def inject_values_to_event_stream(tag, es)
  return es unless @_inject_enabled

  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    r = record.dup
    if @_inject_hostname_key
      r[@_inject_hostname_key] = @_inject_hostname
    end
    if @_inject_worker_id_key
      r[@_inject_worker_id_key] = @_inject_worker_id
    end
    if @_inject_tag_key
      r[@_inject_tag_key] = tag
    end
    if @_inject_time_key
      r[@_inject_time_key] = @_inject_time_formatter.call(time)
    end
    new_es.add(time, r)
  end

  new_es
end
inject_values_to_record(tag, time, record) click to toggle source
# File lib/fluent/plugin_helper/inject.rb, line 25
def inject_values_to_record(tag, time, record)
  return record unless @_inject_enabled

  r = record.dup
  if @_inject_hostname_key
    r[@_inject_hostname_key] = @_inject_hostname
  end
  if @_inject_worker_id_key
    r[@_inject_worker_id_key] = @_inject_worker_id
  end
  if @_inject_tag_key
    r[@_inject_tag_key] = tag
  end
  if @_inject_time_key
    r[@_inject_time_key] = @_inject_time_formatter.call(time)
  end

  r
end