Skip to content

Commit 417b123

Browse files
feat: add exponential backoff for sending data (#76)
* feat: add exponential backoff for sending data * tests(retry): add tests for retrying mechanism Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * feat(retries): remove retry_forever Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * feat(retries): ensure backward compatibility Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * docs(retry): extend logging messages Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * docs(retries): update README Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * Update lib/fluent/plugin/out_sumologic.rb * fix: fix setting maximum sleep_time Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * feat: rename skip_retry to use_internal_retry Signed-off-by: Dominik Rosiek <drosiek@sumologic.com> * Update README.md Co-authored-by: Andrzej Stencel <astencel@sumologic.com> * Update README.md Co-authored-by: Andrzej Stencel <astencel@sumologic.com> Co-authored-by: Andrzej Stencel <astencel@sumologic.com>
1 parent bb538b0 commit 417b123

File tree

3 files changed

+156
-12
lines changed

3 files changed

+156
-12
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ Configuration options for fluent.conf are:
3838
* `compress_encoding` - Compression encoding format, either `gzip` or `deflate` (default `gzip`)
3939
* `custom_fields` - Comma-separated key=value list of fields to apply to every log. [more information](https://help.sumologic.com/Manage/Fields#http-source-fields)
4040
* `custom_dimensions` - Comma-separated key=value list of dimensions to apply to every metric. [more information](https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#supported-http-headers)
41+
* `use_internal_retry` - Enable custom retry mechanism. As this is `false` by default due to backward compatibility,
42+
we recommend to enable it and configure the following parameters (`retry_min_interval`, `retry_max_interval`, `retry_timeout`, `retry_max_times`)
43+
* `retry_min_interval` - Minimum interval to wait between sending tries (default is `1s`)
44+
* `retry_max_interval` - Maximum interval to wait between sending tries (default is `5m`)
45+
* `retry_timeout` - Time after which the data is going to be dropped (default is `72h`) (`0s` means that there is no timeout)
46+
* `retry_max_times` - Maximum number of retries (default is `0`) (`0` means that there is no max retry times, retries will happen forever)
4147

4248
__NOTE:__ <sup>*</sup> [Placeholders](https://docs.fluentd.org/v1.0/articles/buffer-section#placeholders) are supported
4349

@@ -137,6 +143,14 @@ Example
137143
}
138144
```
139145

146+
## Retry Mechanism
147+
148+
`retry_min_interval`, `retry_max_interval`, `retry_timeout`, `retry_max_times` are not the [buffer retries parameters][buffer_retries].
149+
Due to technical reason, this plugin implements it's own retrying back-off exponential mechanism.
150+
It is disabled by default, but we recommend to enable it by setting `use_internal_retry` to `true`.
151+
152+
[buffer_retries]: https://docs.fluentd.org/configuration/buffer-section#retries-parameters
153+
140154
### TLS 1.2 Requirement
141155

142156
Sumo Logic only accepts connections from clients using TLS version 1.2 or greater. To utilize the content of this repo, ensure that it's running in an execution environment that is configured to use TLS 1.2 or greater.

lib/fluent/plugin/out_sumologic.rb

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
157157
config_param :timestamp_key, :string, :default => 'timestamp'
158158
config_param :proxy_uri, :string, :default => nil
159159
config_param :disable_cookies, :bool, :default => false
160+
161+
config_param :use_internal_retry, :bool, :default => false
162+
config_param :retry_timeout, :time, :default => 72 * 3600 # 72h
163+
config_param :retry_max_times, :integer, :default => 0
164+
config_param :retry_min_interval, :time, :default => 1 # 1s
165+
config_param :retry_max_interval, :time, :default => 5*60 # 5m
166+
160167
# https://help.sumologic.com/Manage/Fields
161168
desc 'Fields string (eg "cluster=payment, service=credit_card") which is going to be added to every log record.'
162169
config_param :custom_fields, :string, :default => nil
@@ -385,6 +392,7 @@ def write(chunk)
385392

386393
end
387394

395+
chunk_id = "##{chunk.dump_unique_id_hex(chunk.unique_id)}"
388396
# Push logs to sumo
389397
messages_list.each do |key, messages|
390398
source_name, source_category, source_host, fields = key[:source_name], key[:source_category],
@@ -397,18 +405,53 @@ def write(chunk)
397405
fields = [fields,@custom_fields].compact.join(",")
398406
end
399407

400-
@log.debug { "Sending #{messages.count} #{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}'." }
401-
402-
@sumo_conn.publish(
403-
messages.join("\n"),
404-
source_host =source_host,
405-
source_category =source_category,
406-
source_name =source_name,
407-
data_type =@data_type,
408-
metric_data_format =@metric_data_format,
409-
collected_fields =fields,
410-
dimensions =@custom_dimensions
411-
)
408+
retries = 0
409+
start_time = Time.now
410+
sleep_time = @retry_min_interval
411+
412+
while true
413+
common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}"
414+
begin
415+
@log.debug { "Sending #{messages.count}; #{common_log_part}" }
416+
417+
@sumo_conn.publish(
418+
messages.join("\n"),
419+
source_host =source_host,
420+
source_category =source_category,
421+
source_name =source_name,
422+
data_type =@data_type,
423+
metric_data_format =@metric_data_format,
424+
collected_fields =fields,
425+
dimensions =@custom_dimensions
426+
)
427+
break
428+
rescue => e
429+
if !@use_internal_retry
430+
raise e
431+
end
432+
# increment retries
433+
retries = retries + 1
434+
435+
log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
436+
log.warn_backtrace e.backtrace
437+
438+
# drop data if
439+
# - we reached out the @retry_max_times retries
440+
# - or we exceeded @retry_timeout
441+
if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
442+
log.warn "dropping records; #{common_log_part}"
443+
break
444+
end
445+
446+
log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
447+
sleep sleep_time
448+
449+
sleep_time = sleep_time * 2
450+
if sleep_time > @retry_max_interval
451+
sleep_time = @retry_max_interval
452+
end
453+
end
454+
end
412455
end
413456

414457
end

test/plugin/test_out_sumologic.rb

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,4 +825,91 @@ def test_warning_response_from_receiver
825825
end
826826
end
827827

828+
def test_resend
829+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
830+
config = %{
831+
endpoint #{endpoint}
832+
retry_min_interval 0s
833+
retry_max_times 3
834+
use_internal_retry true
835+
}
836+
time = event_time
837+
838+
driver = create_driver(config)
839+
stub_request(:post, endpoint).to_return(
840+
{status: 500, headers: {content_type: 'application/json'}},
841+
{status: 200, headers: {content_type: 'application/json'}}
842+
)
843+
driver.run do
844+
driver.feed("test", time, {"message": "test"})
845+
end
846+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
847+
body: /\A{"timestamp":\d+.,"message":"test"}\z/,
848+
times:2
849+
end
850+
851+
def test_resend_failed
852+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
853+
config = %{
854+
endpoint #{endpoint}
855+
retry_min_interval 0s
856+
retry_max_times 15
857+
use_internal_retry true
858+
}
859+
time = event_time
860+
861+
driver = create_driver(config)
862+
stub_request(:post, endpoint).to_return(
863+
status: 500, headers: {content_type: 'application/json'}
864+
)
865+
driver.run do
866+
driver.feed("test", time, {"message": "test"})
867+
end
868+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
869+
body: /\A{"timestamp":\d+.,"message":"test"}\z/,
870+
times:15
871+
end
872+
873+
def test_resend_forever
874+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
875+
config = %{
876+
endpoint #{endpoint}
877+
retry_min_interval 0s
878+
retry_max_times 0
879+
retry_timeout 0s
880+
use_internal_retry true
881+
}
882+
time = event_time
883+
884+
driver = create_driver(config)
885+
stub_request(:post, endpoint).to_return(
886+
*[{status: 500, headers: {content_type: 'application/json'}}]*123,
887+
{status: 200, headers: {content_type: 'application/json'}}
888+
)
889+
driver.run do
890+
driver.feed("test", time, {"message": "test"})
891+
end
892+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
893+
body: /\A{"timestamp":\d+.,"message":"test"}\z/,
894+
times:124
895+
end
896+
897+
def test_skip_retry
898+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
899+
config = %{
900+
endpoint #{endpoint}
901+
}
902+
time = event_time
903+
904+
driver = create_driver(config)
905+
stub_request(:post, endpoint).to_return(status: 500, headers: {content_type: 'application/json'})
906+
907+
exception = assert_raise(RuntimeError) {
908+
driver.run do
909+
driver.feed("test", time, {"message": "test"})
910+
end
911+
}
912+
assert_equal("Failed to send data to HTTP Source. 500 - ", exception.message)
913+
end
914+
828915
end

0 commit comments

Comments
 (0)