diff --git a/README.md b/README.md
index 09d7079..c2bb4e1 100644
--- a/README.md
+++ b/README.md
@@ -129,6 +129,30 @@ When a group reaches its limit and as long as it is not reset, a warning
message with the current log rate of the group is emitted repeatedly. This is
the delay between every repetition.
+#### ignore
+
+Default: `none`
+
+Define which records you want to ignore, you should specify key and regex to filter by.
+
+Example:
+
+```
+
+ key app.version
+ regex /^(2|3)$/
+
+```
+
+A dot indicates a key within a sub-object. As an example, in the following log,
+the group key resolve to "2":
+```
+{"level": "error", "msg": "plugin test", "app": { "version": "2" } }
+```
+
+Will not take into throttling bucket calculations records that has version 2 or 3,
+They will just pass-through.
+
## License
Apache License, Version 2.0
diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb
index 16c74f5..bbd2253 100644
--- a/lib/fluent/plugin/filter_throttle.rb
+++ b/lib/fluent/plugin/filter_throttle.rb
@@ -34,6 +34,25 @@ class ThrottleFilter < Filter
This is the delay between every repetition.
DESC
config_param :group_warning_delay_s, :integer, :default => 10
+
+ desc <<~DESC
+ Defines records which should be excluded from the throttling counters
+ DESC
+ config_section :ignore, param_name: :ignores, multi: true do
+ desc "The field name to which the regular expression is applied"
+ config_param :key do |value|
+ value.split(".")
+ end
+ desc "The regular expression"
+ config_param :regex do |value|
+ if value.start_with?("/") && value.end_with?("/")
+ Regexp.compile(value[1..-2])
+ else
+ Regexp.compile(value)
+ end
+ end
+ end
+
Group = Struct.new(
:rate_count,
@@ -80,9 +99,18 @@ def shutdown
end
def filter(tag, time, record)
+ unless @ignores.empty?
+ @ignores.each { |ignore|
+ keysValue = extract_value_from_key_path(ignore.key, record)
+ if keysValue != nil && ignore.regex.match(keysValue.to_s)
+ return record
+ end
+ }
+ end
+
now = Time.now
rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
- group = extract_group(record)
+ group = extract_value_from_key_paths(@group_key_paths, record)
counter = (@counters[group] ||= Group.new(0, now, 0, 0, now, nil))
counter.rate_count += 1
@@ -133,12 +161,16 @@ def filter(tag, time, record)
private
- def extract_group(record)
- @group_key_paths.map do |key_path|
- record.dig(*key_path) || record.dig(*key_path.map(&:to_sym))
+ def extract_value_from_key_paths(paths, record)
+ paths.map do |key_path|
+ extract_value_from_key_path(key_path, record)
end
end
+ def extract_value_from_key_path(path, record)
+ record.dig(*path) || record.dig(*path.map(&:to_sym))
+ end
+
def log_rate_limit_exceeded(now, group, counter)
emit = counter.last_warning == nil ? true \
: (now - counter.last_warning) >= @group_warning_delay_s
diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb
index 188051f..099b86c 100644
--- a/test/fluent/plugin/filter_throttle_test.rb
+++ b/test/fluent/plugin/filter_throttle_test.rb
@@ -139,6 +139,93 @@ def create_driver(conf='')
assert_equal records_expected, driver.filtered_records.size
assert driver.logs.any? { |log| log.include?('rate exceeded') }
assert driver.logs.any? { |log| log.include?('rate back down') }
+
+ end
+
+ it 'does not throttle when log includes the key to ignore' do
+ driver = create_driver <<~CONF
+ group_key "group"
+ group_bucket_period_s 1
+ group_bucket_limit 15
+
+ key level
+ regex /^([Ii]nfo|[Ii]nformation|[Dd]ebug)$/
+
+ CONF
+
+ driver.run(default_tag: "test") do
+ driver.feed([[event_time, {"msg": "test lower cased i", "level": "info", "group": "a"}]] * 10)
+ driver.feed([[event_time, {"msg": "test capital I", "level": "Info", "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "level": "information", "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test capital I", "level": "Information", "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 20)
+ driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 20)
+ end
+
+ assert_equal(70, driver.filtered_records.compact.length) # compact remove nils
+ end
+
+ it 'does not throttle when log includes the nested key to ignore' do
+ driver = create_driver <<~CONF
+ group_key "group"
+ group_bucket_period_s 1
+ group_bucket_limit 15
+
+ key app.version
+ regex /^(2|3)$/
+
+ CONF
+
+ driver.run(default_tag: "test") do
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
+ end
+
+ assert_equal(50, driver.filtered_records.compact.length) # compact remove nils
+ end
+
+ it 'does not throttle when nested key to ignore does not exists' do
+ driver = create_driver <<~CONF
+ group_key "group"
+ group_bucket_period_s 1
+ group_bucket_limit 15
+
+ key app.author
+ regex /^(john|doe)$/
+
+ CONF
+
+ driver.run(default_tag: "test") do
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
+ end
+
+ assert_equal(30, driver.filtered_records.compact.length) # compact remove nils
+ end
+
+ it 'does not throttle when key to ignore does not exists' do
+ driver = create_driver <<~CONF
+ group_key "group"
+ group_bucket_period_s 1
+ group_bucket_limit 15
+
+ key testKey
+ regex /^(test|test2)$/
+
+ CONF
+
+ driver.run(default_tag: "test") do
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
+ driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
+ end
+
+ assert_equal(30, driver.filtered_records.compact.length) # compact remove nils
end
end