module Fluent::Plugin::ElasticsearchIndexLifecycleManagement

Constants

ILM_DEFAULT_POLICY_PATH

Public Instance Methods

create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 31
def create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false)
  if overwrite || !ilm_policy_exists?(policy_id)
    ilm_policy_put(policy_id, ilm_policy)
  end
end
default_policy_payload() click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 77
def default_policy_payload
  default_policy_path = File.join(__dir__, ILM_DEFAULT_POLICY_PATH)
  Yajl.load(::IO.read(default_policy_path))
end
get_ilm_policy() click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 47
def get_ilm_policy
  if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0")
    client.ilm.get_policy
  else
    client.enrich.get_policy
  end
end
ilm_policy_exists?(policy_id) click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 55
def ilm_policy_exists?(policy_id)
  begin
    if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0")
      client.ilm.get_policy(policy_id: policy_id)
    else
      client.enrich.get_policy(name: policy_id)
    end
    true
  rescue
    false
  end
end
ilm_policy_put(policy_id, policy) click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 68
def ilm_policy_put(policy_id, policy)
  log.info("Installing ILM policy: #{policy}")
  if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0")
    client.ilm.put_policy(policy_id: policy_id, body: policy)
  else
    client.enrich.put_policy(name: policy_id, body: policy)
  end
end
setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 6
def setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false)
  return unless enable_ilm

  create_ilm_policy(policy_id, ilm_policy, overwrite)
end
verify_ilm_working() click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 12
def verify_ilm_working
  # Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature
  # available and enabled.
  begin
    xpack = xpack_info
    if xpack.nil?
      raise Fluent::ConfigError, "xpack endpoint does not work"
    end
    features = xpack["features"]
    ilm = features.nil? ? nil : features["ilm"]
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch" if features.nil? || ilm.nil?
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not available in your Elasticsearch" unless ilm['available']
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not enabled in your Elasticsearch" unless ilm['enabled']

  rescue ::TRANSPORT_CLASS::Transport::Error => e
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch", error: e
  end
end
xpack_info() click to toggle source
# File lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb, line 37
def xpack_info
  begin
    client.xpack.info
  rescue NoMethodError
    raise RuntimeError, "elasticsearch-xpack gem is not installed."
  rescue
    nil
  end
end