class Fluent::Plugin::HTTPOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Output::new
# File lib/fluent/plugin/out_http.rb, line 97 def initialize super @uri = nil @proxy_uri = nil @formatter = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_http.rb, line 105 def configure(conf) super if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] end @http_opt = setup_http_option @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @content_type = setup_content_type unless @content_type if @json_array if @formatter_configs.first[:@type] != "json" raise Fluent::ConfigError, "json_array option could be used with json formatter only" end define_singleton_method(:format, method(:format_json_array)) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 134 def format(tag, time, record) @formatter.format(tag, time, record) end
format_json_array(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 138 def format_json_array(tag, time, record) @formatter.format(tag, time, record) << "," end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 130 def formatted_to_msgpack_binary? @formatter_configs.first[:@type] == 'msgpack' end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 126 def multi_workers_ready? true end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 142 def write(chunk) uri = parse_endpoint(chunk) req = create_request(chunk, uri) log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } send_request(uri, req) end
Private Instance Methods
create_request(chunk, uri)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 232 def create_request(chunk, uri) req = case @http_method when :post Net::HTTP::Post.new(uri.request_uri) when :put Net::HTTP::Put.new(uri.request_uri) end if @auth req.basic_auth(@auth.username, @auth.password) end set_headers(req, chunk) req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read req end
parse_endpoint(chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 213 def parse_endpoint(chunk) endpoint = extract_placeholders(@endpoint, chunk) URI.parse(endpoint) end
send_request(uri, req)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 247 def send_request(uri, req) res = if @proxy_uri Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| http.request(req) } else Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| http.request(req) } end if res.is_a?(Net::HTTPSuccess) log.debug { "#{res.code} #{res.message.rstrip}#{res.body.lstrip}" } else msg = "#{res.code} #{res.message.rstrip} #{res.body.lstrip}" if @retryable_response_codes.include?(res.code.to_i) raise RetryableResponse, msg end if @error_response_as_unrecoverable raise Fluent::UnrecoverableError, msg else log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}" end end end
set_headers(req, chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 218 def set_headers(req, chunk) if @headers @headers.each do |k, v| req[k] = v end end if @headers_from_placeholders @headers_from_placeholders.each do |k, v| req[k] = extract_placeholders(v, chunk) end end req['Content-Type'] = @content_type end
setup_content_type()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 153 def setup_content_type case @formatter_configs.first[:@type] when 'json' @json_array ? 'application/json' : 'application/x-ndjson' when 'csv' 'text/csv' when 'tsv', 'ltsv' 'text/tab-separated-values' when 'msgpack' 'application/x-msgpack' when 'out_file', 'single_value', 'stdout', 'hash' 'text/plain' else raise Fluent::ConfigError, "can't determine Content-Type from formatter type. Set content_type parameter explicitly" end end
setup_http_option()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 170 def setup_http_option use_ssl = @endpoint.start_with?('https') opt = { open_timeout: @open_timeout, read_timeout: @read_timeout, ssl_timeout: @ssl_timeout, use_ssl: use_ssl } if use_ssl if @tls_ca_cert_path raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path) opt[:ca_file] = @tls_ca_cert_path end if @tls_client_cert_path raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path) bundle = File.read(@tls_client_cert_path) bundle_certs = bundle.scan(/-----BEGIN CERTIFICATE-----(?:.|\n)+?-----END CERTIFICATE-----/) opt[:cert] = OpenSSL::X509::Certificate.new(bundle_certs[0]) intermediate_certs = bundle_certs[1..-1] if intermediate_certs opt[:extra_chain_cert] = intermediate_certs.map { |cert| OpenSSL::X509::Certificate.new(cert) } end end if @tls_private_key_path raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path) opt[:key] = OpenSSL::PKey.read(File.read(@tls_private_key_path), @tls_private_key_passphrase) end opt[:verify_mode] = case @tls_verify_mode when :none OpenSSL::SSL::VERIFY_NONE when :peer OpenSSL::SSL::VERIFY_PEER end opt[:ciphers] = @tls_ciphers opt[:ssl_version] = @tls_version end opt end