class Fluent::Plugin::HTTPOutput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Output::new
# File lib/fluent/plugin/out_http.rb, line 97
def initialize
  super

  @uri = nil
  @proxy_uri = nil
  @formatter = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_http.rb, line 105
def configure(conf)
  super

  if @retryable_response_codes.nil?
    log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish')
    @retryable_response_codes = [503]
  end

  @http_opt = setup_http_option
  @proxy_uri = URI.parse(@proxy) if @proxy
  @formatter = formatter_create
  @content_type = setup_content_type unless @content_type

  if @json_array
    if @formatter_configs.first[:@type] != "json"
      raise Fluent::ConfigError, "json_array option could be used with json formatter only"
    end
    define_singleton_method(:format, method(:format_json_array))
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 134
def format(tag, time, record)
  @formatter.format(tag, time, record)
end
format_json_array(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 138
def format_json_array(tag, time, record)
  @formatter.format(tag, time, record) << ","
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 130
def formatted_to_msgpack_binary?
  @formatter_configs.first[:@type] == 'msgpack'
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 126
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 142
def write(chunk)
  uri = parse_endpoint(chunk)
  req = create_request(chunk, uri)

  log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

  send_request(uri, req)
end

Private Instance Methods

create_request(chunk, uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 232
def create_request(chunk, uri)
  req = case @http_method
        when :post
          Net::HTTP::Post.new(uri.request_uri)
        when :put
          Net::HTTP::Put.new(uri.request_uri)
        end
  if @auth
    req.basic_auth(@auth.username, @auth.password)
  end
  set_headers(req, chunk)
  req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read
  req
end
parse_endpoint(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 213
def parse_endpoint(chunk)
  endpoint = extract_placeholders(@endpoint, chunk)
  URI.parse(endpoint)
end
send_request(uri, req) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 247
def send_request(uri, req)
  res = if @proxy_uri
          Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http|
            http.request(req)
          }
        else
          Net::HTTP.start(uri.host, uri.port, @http_opt) { |http|
            http.request(req)
          }
        end

  if res.is_a?(Net::HTTPSuccess)
    log.debug { "#{res.code} #{res.message.rstrip}#{res.body.lstrip}" }
  else
    msg = "#{res.code} #{res.message.rstrip} #{res.body.lstrip}"

    if @retryable_response_codes.include?(res.code.to_i)
      raise RetryableResponse, msg
    end

    if @error_response_as_unrecoverable
      raise Fluent::UnrecoverableError, msg
    else
      log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}"
    end
  end
end
set_headers(req, chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 218
def set_headers(req, chunk)
  if @headers
    @headers.each do |k, v|
      req[k] = v
    end
  end
  if @headers_from_placeholders
    @headers_from_placeholders.each do |k, v|
      req[k] = extract_placeholders(v, chunk)
    end
  end
  req['Content-Type'] = @content_type
end
setup_content_type() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 153
def setup_content_type
  case @formatter_configs.first[:@type]
  when 'json'
    @json_array ? 'application/json' : 'application/x-ndjson'
  when 'csv'
    'text/csv'
  when 'tsv', 'ltsv'
    'text/tab-separated-values'
  when 'msgpack'
    'application/x-msgpack'
  when 'out_file', 'single_value', 'stdout', 'hash'
    'text/plain'
  else
    raise Fluent::ConfigError, "can't determine Content-Type from formatter type. Set content_type parameter explicitly"
  end
end
setup_http_option() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 170
def setup_http_option
  use_ssl = @endpoint.start_with?('https')
  opt = {
    open_timeout: @open_timeout,
    read_timeout: @read_timeout,
    ssl_timeout: @ssl_timeout,
    use_ssl: use_ssl
  }

  if use_ssl
    if @tls_ca_cert_path
      raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path)
      opt[:ca_file] = @tls_ca_cert_path
    end
    if @tls_client_cert_path
      raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path)

      bundle = File.read(@tls_client_cert_path)
      bundle_certs = bundle.scan(/-----BEGIN CERTIFICATE-----(?:.|\n)+?-----END CERTIFICATE-----/)
      opt[:cert] = OpenSSL::X509::Certificate.new(bundle_certs[0])

      intermediate_certs = bundle_certs[1..-1]
      if intermediate_certs
        opt[:extra_chain_cert] = intermediate_certs.map { |cert| OpenSSL::X509::Certificate.new(cert) }
      end
    end
    if @tls_private_key_path
      raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path)
      opt[:key] = OpenSSL::PKey.read(File.read(@tls_private_key_path), @tls_private_key_passphrase)
    end
    opt[:verify_mode] = case @tls_verify_mode
                        when :none
                          OpenSSL::SSL::VERIFY_NONE
                        when :peer
                          OpenSSL::SSL::VERIFY_PEER
                        end
    opt[:ciphers] = @tls_ciphers
    opt[:ssl_version] = @tls_version
  end

  opt
end