Skip to content

Commit 79d0142

Browse files
authored
Add support for compression (gzip and deflate) (#58)
* Add support for compression (gzip and deflate) * Use gzip as preferred compression encoding
1 parent 0d80776 commit 79d0142

File tree

2 files changed

+94
-3
lines changed

2 files changed

+94
-3
lines changed

lib/fluent/plugin/out_sumologic.rb

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,30 @@
22
require 'net/https'
33
require 'yajl'
44
require 'httpclient'
5+
require 'zlib'
6+
require 'stringio'
57

68
class SumologicConnection
79

810
attr_reader :http
911

10-
def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies, sumo_client)
12+
COMPRESS_DEFLATE = 'deflate'
13+
COMPRESS_GZIP = 'gzip'
14+
15+
def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding)
1116
@endpoint = endpoint
1217
@sumo_client = sumo_client
1318
create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
19+
@compress = compress_enabled
20+
@compress_encoding = (compress_encoding ||= COMPRESS_GZIP).downcase
21+
22+
unless [COMPRESS_DEFLATE, COMPRESS_GZIP].include? @compress_encoding
23+
raise "Invalid compression encoding #{@compress_encoding} must be gzip or deflate"
24+
end
1425
end
1526

1627
def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields)
17-
response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
28+
response = http.post(@endpoint, compress(raw_data), request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
1829
unless response.ok?
1930
raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
2031
end
@@ -27,6 +38,11 @@ def request_headers(source_host, source_category, source_name, data_type, metric
2738
'X-Sumo-Host' => source_host,
2839
'X-Sumo-Client' => @sumo_client,
2940
}
41+
42+
if @compress
43+
headers['Content-Encoding'] = @compress_encoding
44+
end
45+
3046
if data_type == 'metrics'
3147
case metric_data_format
3248
when 'graphite'
@@ -57,6 +73,29 @@ def create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
5773
@http.cookie_manager = nil
5874
end
5975
end
76+
77+
def compress(content)
78+
if @compress
79+
if @compress_encoding == COMPRESS_GZIP
80+
result = gzip(content)
81+
result.bytes.to_a.pack("c*")
82+
else
83+
Zlib::Deflate.deflate(content)
84+
end
85+
else
86+
content
87+
end
88+
end # def compress
89+
90+
def gzip(content)
91+
stream = StringIO.new("w")
92+
stream.set_encoding("ASCII")
93+
gz = Zlib::GzipWriter.new(stream)
94+
gz.mtime=1 # Ensure that for same content there is same output
95+
gz.write(content)
96+
gz.close
97+
stream.string.bytes.to_a.pack("c*")
98+
end # def gzip
6099
end
61100

62101
class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
@@ -92,6 +131,10 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
92131
config_param :custom_fields, :string, :default => nil
93132
desc 'Name of sumo client which is send as X-Sumo-Client header'
94133
config_param :sumo_client, :string, :default => 'fluentd-output'
134+
desc 'Compress payload'
135+
config_param :compress, :bool, :default => false
136+
desc 'Encoding method of compresssion (either gzip or deflate)'
137+
config_param :compress_encoding, :string, :default => SumologicConnection::COMPRESS_GZIP
95138

96139
config_section :buffer do
97140
config_set_default :@type, DEFAULT_BUFFER_TYPE
@@ -150,7 +193,9 @@ def configure(conf)
150193
conf['open_timeout'].to_i,
151194
conf['proxy_uri'],
152195
conf['disable_cookies'],
153-
conf['sumo_client']
196+
conf['sumo_client'],
197+
conf['compress'],
198+
conf['compress_encoding']
154199
)
155200
super
156201
end

test/plugin/test_out_sumologic.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def test_default_configure
7373
assert_equal instance.proxy_uri, nil
7474
assert_equal instance.disable_cookies, false
7575
assert_equal instance.sumo_client, 'fluentd-output'
76+
assert_equal instance.compress_encoding, 'gzip'
7677
end
7778

7879
def test_emit_text
@@ -570,4 +571,49 @@ def test_batching_different_fields
570571
times:1
571572
end
572573

574+
def test_emit_json_merge_timestamp_compress_deflate
575+
config = %{
576+
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
577+
log_format json_merge
578+
source_category test
579+
source_host test
580+
source_name test
581+
compress true
582+
compress_encoding deflate
583+
584+
}
585+
driver = create_driver(config)
586+
time = event_time
587+
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
588+
driver.run do
589+
driver.feed("output.test", time, {'message' => '{"timestamp":123}'})
590+
end
591+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
592+
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'deflate'},
593+
body: "\x78\x9c\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\xae\x05\x00\x38\xb0\x05\xe1".force_encoding("ASCII-8BIT"),
594+
times:1
595+
end
596+
597+
def test_emit_json_merge_timestamp_compress_gzip
598+
config = %{
599+
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
600+
log_format json_merge
601+
source_category test
602+
source_host test
603+
source_name test
604+
compress true
605+
606+
}
607+
driver = create_driver(config)
608+
time = event_time
609+
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
610+
driver.run do
611+
driver.feed("output.test", time, {'message' => '{"timestamp":1234}'})
612+
end
613+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
614+
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'gzip'},
615+
body: "\x1f\x8b\x08\x00\x01\x00\x00\x00\x00\x03\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\x36\xa9\x05\x00\xfe\x53\xbe\x14\x12\x00\x00\x00".force_encoding("ASCII-8BIT"),
616+
times:1
617+
end
618+
573619
end

0 commit comments

Comments
 (0)