class Fluent::Plugin::Buffer::Chunk
Attributes
metadata[R]
state[R]
unique_id[R]
Public Class Methods
new(metadata, compress: :text)
click to toggle source
TODO: CompressedPackedMessage of forward protocol?
Calls superclass method
# File lib/fluent/plugin/buffer/chunk.rb, line 51 def initialize(metadata, compress: :text) super() @unique_id = generate_unique_id @metadata = metadata # state: unstaged/staged/queued/closed @state = :unstaged @size = 0 @created_at = Fluent::Clock.real_now @modified_at = Fluent::Clock.real_now extend Decompressable if compress == :gzip end
Public Instance Methods
append(data, **kwargs)
click to toggle source
data is array of formatted record string
# File lib/fluent/plugin/buffer/chunk.rb, line 87 def append(data, **kwargs) raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip adding = ''.b data.each do |d| adding << d.b end concat(adding, data.size) end
bytesize()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 109 def bytesize raise NotImplementedError, "Implement this method in child class" end
close()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 157 def close @state = :closed self end
closed?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 138 def closed? @state == :closed end
commit()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 101 def commit raise NotImplementedError, "Implement this method in child class" end
concat(bulk, records)
click to toggle source
for event streams which is packed or zipped (and we want not to unpack/uncompress)
# File lib/fluent/plugin/buffer/chunk.rb, line 97 def concat(bulk, records) raise NotImplementedError, "Implement this method in child class" end
created_at()
click to toggle source
for compatibility
# File lib/fluent/plugin/buffer/chunk.rb, line 77 def created_at @created_at_object ||= Time.at(@created_at) end
empty?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 118 def empty? size == 0 end
enqueued!()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 152 def enqueued! @state = :queued self end
modified_at()
click to toggle source
for compatibility
# File lib/fluent/plugin/buffer/chunk.rb, line 82 def modified_at @modified_at_object ||= Time.at(@modified_at) end
open(**kwargs, &block)
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 172 def open(**kwargs, &block) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end
purge()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 162 def purge @state = :closed self end
queued?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 134 def queued? @state == :queued end
raw_create_at()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 68 def raw_create_at @created_at end
raw_modified_at()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 72 def raw_modified_at @modified_at end
read(**kwargs)
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 167 def read(**kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end
rollback()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 105 def rollback raise NotImplementedError, "Implement this method in child class" end
size()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 113 def size raise NotImplementedError, "Implement this method in child class" end
Also aliased as: length
staged!()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 142 def staged! @state = :staged self end
staged?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 130 def staged? @state == :staged end
unstaged!()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 147 def unstaged! @state = :unstaged self end
unstaged?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 126 def unstaged? @state == :unstaged end
writable?()
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 122 def writable? @state == :staged || @state == :unstaged end
write_to(io, **kwargs)
click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 177 def write_to(io, **kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip open do |i| IO.copy_stream(i, io) end end