Skip to content

Commit 800497f

Browse files
authored
feat(records_per_request): add records_per_request to limit number of records per request (#78)
Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>
1 parent 417b123 commit 800497f

File tree

3 files changed

+140
-42
lines changed

3 files changed

+140
-42
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Configuration options for fluent.conf are:
4444
* `retry_max_interval` - Maximum interval to wait between sending tries (default is `5m`)
4545
* `retry_timeout` - Time after which the data is going to be dropped (default is `72h`) (`0s` means that there is no timeout)
4646
* `retry_max_times` - Maximum number of retries (default is `0`) (`0` means that there is no max retry times, retries will happen forever)
47+
* `max_request_size` - Maximum request size (before applying compression). Default is `0k` which means no limit
4748

4849
__NOTE:__ <sup>*</sup> [Placeholders](https://docs.fluentd.org/v1.0/articles/buffer-section#placeholders) are supported
4950

lib/fluent/plugin/out_sumologic.rb

Lines changed: 73 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
164164
config_param :retry_min_interval, :time, :default => 1 # 1s
165165
config_param :retry_max_interval, :time, :default => 5*60 # 5m
166166

167+
config_param :max_request_size, :size, :default => 0
168+
167169
# https://help.sumologic.com/Manage/Fields
168170
desc 'Fields string (eg "cluster=payment, service=credit_card") which is going to be added to every log record.'
169171
config_param :custom_fields, :string, :default => nil
@@ -252,6 +254,10 @@ def configure(conf)
252254
conf['compress_encoding'],
253255
log,
254256
)
257+
258+
if !conf['max_request_size'].nil? && conf['max_request_size'].to_i <= 0
259+
conf['max_request_size'] = '0'
260+
end
255261
super
256262
end
257263

@@ -405,50 +411,75 @@ def write(chunk)
405411
fields = [fields,@custom_fields].compact.join(",")
406412
end
407413

408-
retries = 0
409-
start_time = Time.now
410-
sleep_time = @retry_min_interval
411-
412-
while true
413-
common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}"
414-
begin
415-
@log.debug { "Sending #{messages.count}; #{common_log_part}" }
416-
417-
@sumo_conn.publish(
418-
messages.join("\n"),
419-
source_host =source_host,
420-
source_category =source_category,
421-
source_name =source_name,
422-
data_type =@data_type,
423-
metric_data_format =@metric_data_format,
424-
collected_fields =fields,
425-
dimensions =@custom_dimensions
426-
)
427-
break
428-
rescue => e
429-
if !@use_internal_retry
430-
raise e
414+
if @max_request_size <= 0
415+
messages_to_send = [messages]
416+
else
417+
messages_to_send = []
418+
current_message = []
419+
current_length = 0
420+
messages.each do |message|
421+
current_message.push message
422+
current_length += message.length
423+
424+
if current_length > @max_request_size
425+
messages_to_send.push(current_message)
426+
current_message = []
427+
current_length = 0
431428
end
432-
# increment retries
433-
retries = retries + 1
434-
435-
log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
436-
log.warn_backtrace e.backtrace
437-
438-
# drop data if
439-
# - we reached out the @retry_max_times retries
440-
# - or we exceeded @retry_timeout
441-
if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
442-
log.warn "dropping records; #{common_log_part}"
429+
current_length += 1 # this is for newline
430+
end
431+
if current_message.length > 0
432+
messages_to_send.push(current_message)
433+
end
434+
end
435+
436+
messages_to_send.each_with_index do |message, i|
437+
retries = 0
438+
start_time = Time.now
439+
sleep_time = @retry_min_interval
440+
441+
while true
442+
common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}, batch #{i}"
443+
444+
begin
445+
@log.debug { "Sending #{message.count}; #{common_log_part}" }
446+
447+
@sumo_conn.publish(
448+
message.join("\n"),
449+
source_host =source_host,
450+
source_category =source_category,
451+
source_name =source_name,
452+
data_type =@data_type,
453+
metric_data_format =@metric_data_format,
454+
collected_fields =fields,
455+
dimensions =@custom_dimensions
456+
)
443457
break
444-
end
445-
446-
log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
447-
sleep sleep_time
448-
449-
sleep_time = sleep_time * 2
450-
if sleep_time > @retry_max_interval
451-
sleep_time = @retry_max_interval
458+
rescue => e
459+
if !@use_internal_retry
460+
raise e
461+
end
462+
# increment retries
463+
retries += 1
464+
465+
log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
466+
log.warn_backtrace e.backtrace
467+
468+
# drop data if
469+
# - we reached out the @retry_max_times retries
470+
# - or we exceeded @retry_timeout
471+
if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
472+
log.warn "dropping records; #{common_log_part}"
473+
break
474+
end
475+
476+
log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
477+
sleep sleep_time
478+
479+
sleep_time *= 2
480+
if sleep_time > @retry_max_interval
481+
sleep_time = @retry_max_interval
482+
end
452483
end
453484
end
454485
end

test/plugin/test_out_sumologic.rb

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,4 +912,70 @@ def test_skip_retry
912912
assert_equal("Failed to send data to HTTP Source. 500 - ", exception.message)
913913
end
914914

915+
def test_split_negative_or_zero
916+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
917+
918+
configs = [
919+
%{
920+
endpoint #{endpoint}
921+
max_request_size -5
922+
},
923+
%{
924+
endpoint #{endpoint}
925+
max_request_size 0
926+
}
927+
]
928+
929+
time = event_time
930+
931+
configs.each do |config|
932+
WebMock.reset_executed_requests!
933+
driver = create_driver(config)
934+
stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'})
935+
936+
driver.run do
937+
driver.feed("test", time, {"message": "test"})
938+
driver.feed("test", time, {"message": "test"})
939+
driver.feed("test", time, {"message": "test"})
940+
end
941+
942+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
943+
body: /\A{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\z/,
944+
times:1
945+
end
946+
end
947+
948+
def test_split
949+
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
950+
951+
config = %{
952+
endpoint #{endpoint}
953+
max_request_size 80
954+
}
955+
956+
time = event_time
957+
958+
WebMock.reset_executed_requests!
959+
driver = create_driver(config)
960+
stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'})
961+
962+
driver.run do
963+
driver.feed("test", time, {"message": "test1"})
964+
driver.feed("test", time, {"message": "test2"})
965+
driver.feed("test", time, {"message": "test3"})
966+
driver.feed("test", time, {"message": "test4"})
967+
driver.feed("test", time, {"message": "test5"})
968+
end
969+
970+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
971+
body: /\A{"timestamp":\d+.,"message":"test1"}\n{"timestamp":\d+.,"message":"test2"}\z/,
972+
times:1
973+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
974+
body: /\A{"timestamp":\d+.,"message":"test3"}\n{"timestamp":\d+.,"message":"test4"}\z/,
975+
times:1
976+
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
977+
body: /\A{"timestamp":\d+.,"message":"test5"}\z/,
978+
times:1
979+
end
980+
915981
end

0 commit comments

Comments
 (0)