class Fluent::Plugin::HttpInput

Constants

EMPTY_GIF_IMAGE
EVENT_RECORD_PARAMETER
RESPONSE_200
RESPONSE_204
RESPONSE_IMG
RES_400_STATUS
RES_500_STATUS
RES_TEXT_HEADER

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input::new
# File lib/fluent/plugin/in_http.rb, line 94
def initialize
  super

  @km = nil
  @format_name = nil
  @parser_time_key = nil

  # default parsers
  @parser_msgpack = nil
  @parser_json = nil
  @default_time_parser = nil
  @default_keep_time_key = nil
  @float_time_parser = nil

  # <parse> configured parser
  @custom_parser = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_http.rb, line 190
def close
  server_wait_until_stop
  super
end
configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_http.rb, line 112
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  if @cors_allow_credentials
    if @cors_allow_origins.nil? || @cors_allow_origins.include?('*')
      raise Fluent::ConfigError, "Cannot enable cors_allow_credentials without specific origins"
    end
  end

  m = if @parser_configs.first['@type'] == 'in_http'
        @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
        @parser_msgpack.time_key = nil
        @parser_msgpack.estimate_current_event = false
        @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
        @parser_json.time_key = nil
        @parser_json.estimate_current_event = false

        default_parser = parser_create(usage: '')
        @format_name = 'default'
        @parser_time_key = default_parser.time_key
        @default_time_parser = default_parser.get_time_parser
        @default_keep_time_key = default_parser.keep_time_key
        method(:parse_params_default)
      else
        @custom_parser = parser_create
        @format_name = @parser_configs.first['@type']
        @parser_time_key = @custom_parser.time_key
        method(:parse_params_with_parser)
      end
  self.singleton_class.module_eval do
    define_method(:parse_params, m)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 172
def multi_workers_ready?
  true
end
on_request(path_info, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 202
def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
      if @respond_with_empty_img
        return RESPONSE_IMG
      else
        if @use_204_response
          return RESPONSE_204
        else
          return RESPONSE_200
        end
      end
    end

    mes = nil
    # Support batched requests
    if record.is_a?(Array)
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        add_params_to_record(single_record, params)

        if param_time = params['time']
          param_time = param_time.to_f
          single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
        elsif @custom_parser
          single_time = @custom_parser.parse_time(single_record)
          single_time, single_record = @custom_parser.convert_values(single_time, single_record)
        else
          single_time = convert_time_field(single_record)
        end

        mes.add(single_time, single_record)
      end
    else
      add_params_to_record(record, params)

      time = if param_time = params['time']
               param_time = param_time.to_f
               param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
             else
               if record_time.nil?
                 convert_time_field(record)
               else
                 record_time
               end
             end
    end
  rescue => e
    if @dump_error_log
      log.error "failed to process request", error: e
    end
    return [RES_400_STATUS, RES_TEXT_HEADER, "400 Bad Request\n#{e}\n"]
  end

  # TODO server error
  begin
    if mes
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue => e
    if @dump_error_log
      log.error "failed to emit data", error: e
    end
    return [RES_500_STATUS, RES_TEXT_HEADER, "500 Internal Server Error\n#{e}\n"]
  end

  if @respond_with_empty_img
    return RESPONSE_IMG
  else
    if @use_204_response
      return RESPONSE_204
    else
      return RESPONSE_200
    end
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_http.rb, line 176
def start
  @_event_loop_run_timeout = @blocking_timeout

  super

  log.debug "listening http", bind: @bind, port: @port

  @km = KeepaliveManager.new(@keepalive_timeout)
  event_loop_attach(@km)

  server_create_connection(:in_http, @port, bind: @bind, backlog: @backlog, &method(:on_server_connect))
  @float_time_parser = Fluent::NumericTimeParser.new(:float)
end

Private Instance Methods

add_params_to_record(record, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 341
def add_params_to_record(record, params)
  if @add_http_headers
    params.each_pair { |k, v|
      if k.start_with?("HTTP_".freeze)
        record[k] = v
      end
    }
  end

  if @add_query_params
    params.each_pair { |k, v|
      if k.start_with?("QUERY_".freeze)
        record[k] = v
      end
    }
  end

  if @add_remote_addr
    record['REMOTE_ADDR'] = params['REMOTE_ADDR']
  end
end
convert_time_field(record) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 363
def convert_time_field(record)
  if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key)
    if @default_time_parser
      @default_time_parser.parse(t)
    else
      Fluent::EventTime.from_time(Time.at(t))
    end
  else
    Fluent::EventTime.now
  end
end
on_server_connect(conn) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 289
def on_server_connect(conn)
  handler = Handler.new(conn, @km, method(:on_request),
                        @body_size_limit, @format_name, log,
                        @cors_allow_origins, @cors_allow_credentials,
                        @add_query_params)

  conn.on(:data) do |data|
    handler.on_read(data)
  end

  conn.on(:write_complete) do |_|
    handler.on_write_complete
  end

  conn.on(:close) do |_|
    handler.on_close
  end
end
parse_params_default(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 308
def parse_params_default(params)
  if msgpack = params['msgpack']
    @parser_msgpack.parse(msgpack) do |_time, record|
      return nil, record
    end
  elsif js = params['json']
    @parser_json.parse(js) do |_time, record|
      return nil, record
    end
  elsif ndjson = params['ndjson']
    events = []
    ndjson.split(/\r?\n/).each do |js|
      @parser_json.parse(js) do |_time, record|
        events.push(record)
      end
    end
    return nil, events
  else
    raise "'json', 'ndjson' or 'msgpack' parameter is required"
  end
end
parse_params_with_parser(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 330
def parse_params_with_parser(params)
  if content = params[EVENT_RECORD_PARAMETER]
    @custom_parser.parse(content) { |time, record|
      raise "Received event is not #{@format_name}: #{content}" if record.nil?
      return time, record
    }
  else
    raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
  end
end