From 1793ef1940bb5f62f11fda3ddc8aaac1d55b3779 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 18 Feb 2019 13:40:56 +0200 Subject: [PATCH 1/8] add ignore fields config --- lib/fluent/plugin/filter_throttle.rb | 26 ++++++++++++++++++++++ test/fluent/plugin/filter_throttle_test.rb | 24 ++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 16c74f5..d95be9d 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -34,6 +34,23 @@ class ThrottleFilter < Filter This is the delay between every repetition. DESC config_param :group_warning_delay_s, :integer, :default => 10 + + desc <<~DESC + Defines records which should be excluded from the throttling counters + DESC + config_section :ignore, param_name: :ignores, multi: true do + desc "The field name to which the regular expression is applied" + config_param :key, :string + desc "The regular expression" + config_param :regex do |value| + if value.start_with?("/") && value.end_with?("/") + Regexp.compile(value[1..-2]) + else + Regexp.compile(value) + end + end + end + Group = Struct.new( :rate_count, @@ -80,6 +97,15 @@ def shutdown end def filter(tag, time, record) + unless @ignores.empty? + @ignores.each { |ignore| + target_key = ignore.key + if record.include?(target_key.to_sym) && ignore.regex.match(record[target_key.to_sym]) + return record + end + } + end + now = Time.now rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record group = extract_group(record) diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 188051f..ad83a81 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -139,6 +139,30 @@ def create_driver(conf='') assert_equal records_expected, driver.filtered_records.size assert driver.logs.any? { |log| log.include?('rate exceeded') } assert driver.logs.any? { |log| log.include?('rate back down') } + + end + + it 'does not throttle when log includes the key to ignore' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key level + regex /^([Ii]nfo|[Ii]nformation|[Dd]ebug)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "level": "info", "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test capital I", "level": "Info", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "level": "information", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test capital I", "level": "Information", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 10) + end + + assert_equal(60, driver.filtered_records.compact.length) # compact remove nils end end From 54347d917138d924670a5345f05aa3de72e1c245 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 18 Feb 2019 13:59:57 +0200 Subject: [PATCH 2/8] fix test --- test/fluent/plugin/filter_throttle_test.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index ad83a81..858edab 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -158,11 +158,11 @@ def create_driver(conf='') driver.feed([[event_time, {"msg": "test capital I", "level": "Info", "group": "b"}]] * 10) driver.feed([[event_time, {"msg": "test lower cased i", "level": "information", "group": "b"}]] * 10) driver.feed([[event_time, {"msg": "test capital I", "level": "Information", "group": "b"}]] * 10) - driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 10) - driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 20) end - assert_equal(60, driver.filtered_records.compact.length) # compact remove nils + assert_equal(70, driver.filtered_records.compact.length) # compact remove nils end end From 5fb8e47729875db126d600d058a9efb949cfe8d2 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 18 Feb 2019 14:29:54 +0200 Subject: [PATCH 3/8] add readme --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 09d7079..e40e627 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,24 @@ When a group reaches its limit and as long as it is not reset, a warning message with the current log rate of the group is emitted repeatedly. This is the delay between every repetition. +#### ignore + +Default: `none` + +Define which records you want to ignore, you should specify key and regex to filter by. + +Example: + +``` + + key level + regex /^(info|debug)$/ + +``` + +Will not take into throttling bucket calculations records that has level info or debug, +They will just pass-through. + ## License Apache License, Version 2.0 From ad431ac19f2fbe9f9554049e304d6d215d961e90 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 18 Feb 2019 17:20:28 +0200 Subject: [PATCH 4/8] handle cases when record keys are not symbols --- lib/fluent/plugin/filter_throttle.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index d95be9d..324ecf1 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -100,8 +100,10 @@ def filter(tag, time, record) unless @ignores.empty? @ignores.each { |ignore| target_key = ignore.key - if record.include?(target_key.to_sym) && ignore.regex.match(record[target_key.to_sym]) - return record + if record.include?(target_key.to_sym) || record.include?(target_key) + if ignore.regex.match(record[target_key]) || ignore.regex.match(record[target_key.to_sym]) + return record + end end } end From 501a1dea254dacb0d6cb64c66bea726b2b3cf536 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Tue, 19 Feb 2019 13:05:57 +0200 Subject: [PATCH 5/8] enable key path for ignore key --- lib/fluent/plugin/filter_throttle.rb | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 324ecf1..0b38eb4 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -40,7 +40,9 @@ class ThrottleFilter < Filter DESC config_section :ignore, param_name: :ignores, multi: true do desc "The field name to which the regular expression is applied" - config_param :key, :string + config_param :key do |value| + value.split(".") + end desc "The regular expression" config_param :regex do |value| if value.start_with?("/") && value.end_with?("/") @@ -99,18 +101,16 @@ def shutdown def filter(tag, time, record) unless @ignores.empty? @ignores.each { |ignore| - target_key = ignore.key - if record.include?(target_key.to_sym) || record.include?(target_key) - if ignore.regex.match(record[target_key]) || ignore.regex.match(record[target_key.to_sym]) + keysValue = extract_value_from_key_path(ignore.key, record) + if keysValue != nil && ignore.regex.match(keysValue) return record - end end } end now = Time.now rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record - group = extract_group(record) + group = extract_value_from_key_paths(@group_key_paths, record) counter = (@counters[group] ||= Group.new(0, now, 0, 0, now, nil)) counter.rate_count += 1 @@ -161,12 +161,16 @@ def filter(tag, time, record) private - def extract_group(record) - @group_key_paths.map do |key_path| - record.dig(*key_path) || record.dig(*key_path.map(&:to_sym)) + def extract_value_from_key_paths(paths, record) + paths.map do |key_path| + extract_value_from_key_path(key_path, record) end end + def extract_value_from_key_path(path, record) + record.dig(*path) || record.dig(*path.map(&:to_sym)) + end + def log_rate_limit_exceeded(now, group, counter) emit = counter.last_warning == nil ? true \ : (now - counter.last_warning) >= @group_warning_delay_s From 92d1c2d76153e5003d9a060f002d4657735afdcd Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Tue, 19 Feb 2019 13:11:36 +0200 Subject: [PATCH 6/8] add readme --- README.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e40e627..c2bb4e1 100644 --- a/README.md +++ b/README.md @@ -139,12 +139,18 @@ Example: ``` - key level - regex /^(info|debug)$/ + key app.version + regex /^(2|3)$/ ``` -Will not take into throttling bucket calculations records that has level info or debug, +A dot indicates a key within a sub-object. As an example, in the following log, +the group key resolve to "2": +``` +{"level": "error", "msg": "plugin test", "app": { "version": "2" } } +``` + +Will not take into throttling bucket calculations records that has version 2 or 3, They will just pass-through. ## License From bb1ffc7f503e5464de2cfe2b2af47d3549f92e93 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Tue, 19 Feb 2019 13:26:32 +0200 Subject: [PATCH 7/8] yay it works --- lib/fluent/plugin/filter_throttle.rb | 2 +- test/fluent/plugin/filter_throttle_test.rb | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 0b38eb4..bbd2253 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -102,7 +102,7 @@ def filter(tag, time, record) unless @ignores.empty? @ignores.each { |ignore| keysValue = extract_value_from_key_path(ignore.key, record) - if keysValue != nil && ignore.regex.match(keysValue) + if keysValue != nil && ignore.regex.match(keysValue.to_s) return record end } diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 858edab..7d9d841 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -164,6 +164,27 @@ def create_driver(conf='') assert_equal(70, driver.filtered_records.compact.length) # compact remove nils end + + it 'does not throttle when log includes the nested key to ignore' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key app.version + regex /^(2|3)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(50, driver.filtered_records.compact.length) # compact remove nils + end end describe 'logging' do From 76e311796462ba750e7d3b03edb6d62594140efc Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Tue, 19 Feb 2019 13:37:39 +0200 Subject: [PATCH 8/8] add missing keys test --- test/fluent/plugin/filter_throttle_test.rb | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 7d9d841..099b86c 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -185,6 +185,48 @@ def create_driver(conf='') assert_equal(50, driver.filtered_records.compact.length) # compact remove nils end + + it 'does not throttle when nested key to ignore does not exists' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key app.author + regex /^(john|doe)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(30, driver.filtered_records.compact.length) # compact remove nils + end + + it 'does not throttle when key to ignore does not exists' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key testKey + regex /^(test|test2)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(30, driver.filtered_records.compact.length) # compact remove nils + end end describe 'logging' do