diff --git a/README.md b/README.md index 09d7079..c2bb4e1 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,30 @@ When a group reaches its limit and as long as it is not reset, a warning message with the current log rate of the group is emitted repeatedly. This is the delay between every repetition. +#### ignore + +Default: `none` + +Define which records you want to ignore, you should specify key and regex to filter by. + +Example: + +``` + + key app.version + regex /^(2|3)$/ + +``` + +A dot indicates a key within a sub-object. As an example, in the following log, +the group key resolve to "2": +``` +{"level": "error", "msg": "plugin test", "app": { "version": "2" } } +``` + +Will not take into throttling bucket calculations records that has version 2 or 3, +They will just pass-through. + ## License Apache License, Version 2.0 diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 16c74f5..bbd2253 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -34,6 +34,25 @@ class ThrottleFilter < Filter This is the delay between every repetition. DESC config_param :group_warning_delay_s, :integer, :default => 10 + + desc <<~DESC + Defines records which should be excluded from the throttling counters + DESC + config_section :ignore, param_name: :ignores, multi: true do + desc "The field name to which the regular expression is applied" + config_param :key do |value| + value.split(".") + end + desc "The regular expression" + config_param :regex do |value| + if value.start_with?("/") && value.end_with?("/") + Regexp.compile(value[1..-2]) + else + Regexp.compile(value) + end + end + end + Group = Struct.new( :rate_count, @@ -80,9 +99,18 @@ def shutdown end def filter(tag, time, record) + unless @ignores.empty? + @ignores.each { |ignore| + keysValue = extract_value_from_key_path(ignore.key, record) + if keysValue != nil && ignore.regex.match(keysValue.to_s) + return record + end + } + end + now = Time.now rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record - group = extract_group(record) + group = extract_value_from_key_paths(@group_key_paths, record) counter = (@counters[group] ||= Group.new(0, now, 0, 0, now, nil)) counter.rate_count += 1 @@ -133,12 +161,16 @@ def filter(tag, time, record) private - def extract_group(record) - @group_key_paths.map do |key_path| - record.dig(*key_path) || record.dig(*key_path.map(&:to_sym)) + def extract_value_from_key_paths(paths, record) + paths.map do |key_path| + extract_value_from_key_path(key_path, record) end end + def extract_value_from_key_path(path, record) + record.dig(*path) || record.dig(*path.map(&:to_sym)) + end + def log_rate_limit_exceeded(now, group, counter) emit = counter.last_warning == nil ? true \ : (now - counter.last_warning) >= @group_warning_delay_s diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 188051f..099b86c 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -139,6 +139,93 @@ def create_driver(conf='') assert_equal records_expected, driver.filtered_records.size assert driver.logs.any? { |log| log.include?('rate exceeded') } assert driver.logs.any? { |log| log.include?('rate back down') } + + end + + it 'does not throttle when log includes the key to ignore' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key level + regex /^([Ii]nfo|[Ii]nformation|[Dd]ebug)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "level": "info", "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test capital I", "level": "Info", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "level": "information", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test capital I", "level": "Information", "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 20) + end + + assert_equal(70, driver.filtered_records.compact.length) # compact remove nils + end + + it 'does not throttle when log includes the nested key to ignore' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key app.version + regex /^(2|3)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(50, driver.filtered_records.compact.length) # compact remove nils + end + + it 'does not throttle when nested key to ignore does not exists' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key app.author + regex /^(john|doe)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(30, driver.filtered_records.compact.length) # compact remove nils + end + + it 'does not throttle when key to ignore does not exists' do + driver = create_driver <<~CONF + group_key "group" + group_bucket_period_s 1 + group_bucket_limit 15 + + key testKey + regex /^(test|test2)$/ + + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20) + driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20) + end + + assert_equal(30, driver.filtered_records.compact.length) # compact remove nils end end