From 262f865e4add96bd89fddbbda593b5a2834c5ece Mon Sep 17 00:00:00 2001 From: Fabrice Baumann Date: Fri, 1 Sep 2017 18:56:38 -0400 Subject: [PATCH] Add new compression plugin Add new ZST compression plugin Add test for the new ZST compression plugin --- README.md | 1 + lib/fluent/plugin/s3_compressor_zst.rb | 35 ++++++++++++++++++++++ lib/fluent/plugin/s3_extractor_zst.rb | 40 ++++++++++++++++++++++++++ test/test_in_s3.rb | 3 +- 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 lib/fluent/plugin/s3_compressor_zst.rb create mode 100644 lib/fluent/plugin/s3_extractor_zst.rb diff --git a/README.md b/README.md index 163a33bb..44856532 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,7 @@ archive format on S3. You can use several format: * text * lzo (Need lzop command) * lzma2 (Need xz command) +* zst (Need zstd command https://github.com/facebook/zstd) * gzip_command (Need gzip command) * This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with `gzip` diff --git a/lib/fluent/plugin/s3_compressor_zst.rb b/lib/fluent/plugin/s3_compressor_zst.rb new file mode 100644 index 00000000..fe856475 --- /dev/null +++ b/lib/fluent/plugin/s3_compressor_zst.rb @@ -0,0 +1,35 @@ +module Fluent::Plugin + class S3Output + class ZSTCompressor < Compressor + S3Output.register_compressor('zst', self) + + config_param :command_parameter, :string, default: '-qq -f -7' + + def configure(conf) + super + check_command('zstd') + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + w = Tempfile.new("chunk-tmp") + w.binmode + chunk.write_to(w) + w.close + + # We don't check the return code because we can't recover zstd failure. + system "zstd #{@command_parameter} -o #{tmp.path} #{w.path}" + ensure + w.close rescue nil + w.unlink rescue nil + end + end + end +end diff --git a/lib/fluent/plugin/s3_extractor_zst.rb b/lib/fluent/plugin/s3_extractor_zst.rb new file mode 100644 index 00000000..a60875fb --- /dev/null +++ b/lib/fluent/plugin/s3_extractor_zst.rb @@ -0,0 +1,40 @@ +module Fluent::Plugin + class S3Input + class ZSTExtractor < Extractor + S3Input.register_extractor('zst', self) + + config_param :command_parameter, :string, default: '-qq -d -f' + + def configure(conf) + super + check_command('zstd') + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def extract(io) + path = if io.respond_to?(path) + io.path + else + temp = Tempfile.new("zst-temp") + temp.write(io.read) + temp.close + temp.path + end + + stdout, succeeded = Open3.capture2("zstd #{@command_parameter} #{path}") + if succeeded + stdout + else + raise "Failed to extract #{path} with zstd command." + end + end + end + end +end diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index f7deb4ec..091d5839 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -89,7 +89,8 @@ def test_unknown_store_as "text" => ["text", "txt", "text/plain"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], "lzo" => ["lzo", "lzo", "application/x-lzop"], - "lzma2" => ["lzma2", "xz", "application/x-xz"]) + "lzma2" => ["lzma2", "xz", "application/x-xz"], + "zst" => ["zst", "zst", "application/x-zst"]) def test_extractor(data) store_type, ext, content_type = data config = CONFIG + "\nstore_as #{store_type}\n"