Skip to content

Commit 7a6cc87

Browse files
authored
Backpressure handling (#2185)
1 parent 0874ca8 commit 7a6cc87

12 files changed

+341
-19
lines changed

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,26 @@
1+
## Unreleased
2+
3+
### Features
4+
5+
- Enable backpressure handling by default [#2185](https://github.com/getsentry/sentry-ruby/pull/2185)
6+
7+
The SDK can now dynamically downsamples transactions to reduce backpressure in high
8+
throughput systems. It starts a new `BackpressureMonitor` thread to perform some health checks
9+
which decide to downsample (halved each time) in 10 second intervals till the system
10+
is healthy again.
11+
12+
To enable this behavior, use:
13+
14+
```ruby
15+
Sentry.init do |config|
16+
# ...
17+
config.traces_sample_rate = 1.0
18+
config.enable_backpressure_handling = true
19+
end
20+
```
21+
22+
If your system serves heavy load, please let us know how this feature works for you!
23+
124
## 5.15.2
225

326
### Bug Fixes

sentry-ruby/lib/sentry-ruby.rb

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
require "sentry/hub"
2222
require "sentry/background_worker"
2323
require "sentry/session_flusher"
24+
require "sentry/backpressure_monitor"
2425
require "sentry/cron/monitor_check_ins"
2526

2627
[
@@ -72,6 +73,10 @@ def exception_locals_tp
7273
# @return [SessionFlusher, nil]
7374
attr_reader :session_flusher
7475

76+
# @!attribute [r] backpressure_monitor
77+
# @return [BackpressureMonitor, nil]
78+
attr_reader :backpressure_monitor
79+
7580
##### Patch Registration #####
7681

7782
# @!visibility private
@@ -217,17 +222,9 @@ def init(&block)
217222
Thread.current.thread_variable_set(THREAD_LOCAL, hub)
218223
@main_hub = hub
219224
@background_worker = Sentry::BackgroundWorker.new(config)
220-
221-
@session_flusher = if config.auto_session_tracking
222-
Sentry::SessionFlusher.new(config, client)
223-
else
224-
nil
225-
end
226-
227-
if config.include_local_variables
228-
exception_locals_tp.enable
229-
end
230-
225+
@session_flusher = config.auto_session_tracking ? Sentry::SessionFlusher.new(config, client) : nil
226+
@backpressure_monitor = config.enable_backpressure_handling ? Sentry::BackpressureMonitor.new(config, client) : nil
227+
exception_locals_tp.enable if config.include_local_variables
231228
at_exit { close }
232229
end
233230

@@ -246,6 +243,11 @@ def close
246243
@session_flusher = nil
247244
end
248245

246+
if @backpressure_monitor
247+
@backpressure_monitor.kill
248+
@backpressure_monitor = nil
249+
end
250+
249251
if configuration&.include_local_variables
250252
exception_locals_tp.disable
251253
end

sentry-ruby/lib/sentry/background_worker.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ def shutdown
6565
@shutdown_callback&.call
6666
end
6767

68+
def full?
69+
@executor.is_a?(Concurrent::ThreadPoolExecutor) &&
70+
@executor.remaining_capacity == 0
71+
end
72+
6873
private
6974

7075
def _perform(&block)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# frozen_string_literal: true
2+
3+
module Sentry
4+
class BackpressureMonitor
5+
include LoggingHelper
6+
7+
DEFAULT_INTERVAL = 10
8+
MAX_DOWNSAMPLE_FACTOR = 10
9+
10+
def initialize(configuration, client, interval: DEFAULT_INTERVAL)
11+
@interval = interval
12+
@client = client
13+
@logger = configuration.logger
14+
15+
@thread = nil
16+
@exited = false
17+
18+
@healthy = true
19+
@downsample_factor = 0
20+
end
21+
22+
def healthy?
23+
ensure_thread
24+
@healthy
25+
end
26+
27+
def downsample_factor
28+
ensure_thread
29+
@downsample_factor
30+
end
31+
32+
def run
33+
check_health
34+
set_downsample_factor
35+
end
36+
37+
def check_health
38+
@healthy = !(@client.transport.any_rate_limited? || Sentry.background_worker&.full?)
39+
end
40+
41+
def set_downsample_factor
42+
if @healthy
43+
log_debug("[BackpressureMonitor] health check positive, reverting to normal sampling") if @downsample_factor.positive?
44+
@downsample_factor = 0
45+
else
46+
@downsample_factor += 1 if @downsample_factor < MAX_DOWNSAMPLE_FACTOR
47+
log_debug("[BackpressureMonitor] health check negative, downsampling with a factor of #{@downsample_factor}")
48+
end
49+
end
50+
51+
def kill
52+
log_debug("[BackpressureMonitor] killing monitor")
53+
54+
@exited = true
55+
@thread&.kill
56+
end
57+
58+
private
59+
60+
def ensure_thread
61+
return if @exited
62+
return if @thread&.alive?
63+
64+
@thread = Thread.new do
65+
loop do
66+
sleep(@interval)
67+
run
68+
end
69+
end
70+
rescue ThreadError
71+
log_debug("[BackpressureMonitor] Thread creation failed")
72+
@exited = true
73+
end
74+
end
75+
end

sentry-ruby/lib/sentry/configuration.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ def capture_exception_frame_locals=(value)
258258
# @return [Boolean]
259259
attr_accessor :auto_session_tracking
260260

261+
# Whether to downsample transactions automatically because of backpressure.
262+
# Starts a new monitor thread to check health of the SDK every 10 seconds.
263+
# Default is false
264+
# @return [Boolean]
265+
attr_accessor :enable_backpressure_handling
266+
261267
# Allowlist of outgoing request targets to which sentry-trace and baggage headers are attached.
262268
# Default is all (/.*/)
263269
# @return [Array<String, Regexp>]
@@ -358,6 +364,7 @@ def initialize
358364
self.skip_rake_integration = false
359365
self.send_client_reports = true
360366
self.auto_session_tracking = true
367+
self.enable_backpressure_handling = false
361368
self.trusted_proxies = []
362369
self.dsn = ENV['SENTRY_DSN']
363370
self.spotlight = false

sentry-ruby/lib/sentry/transaction.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,12 @@ def set_initial_sample_decision(sampling_context:)
218218
if sample_rate == true
219219
@sampled = true
220220
else
221-
@sampled = Random.rand < sample_rate
221+
if Sentry.backpressure_monitor
222+
factor = Sentry.backpressure_monitor.downsample_factor
223+
@effective_sample_rate /= 2**factor
224+
end
225+
226+
@sampled = Random.rand < @effective_sample_rate
222227
end
223228

224229
if @sampled
@@ -257,7 +262,9 @@ def finish(hub: nil, end_timestamp: nil)
257262
event = hub.current_client.event_from_transaction(self)
258263
hub.capture_event(event)
259264
else
260-
hub.current_client.transport.record_lost_event(:sample_rate, 'transaction')
265+
is_backpressure = Sentry.backpressure_monitor&.downsample_factor&.positive?
266+
reason = is_backpressure ? :backpressure : :sample_rate
267+
hub.current_client.transport.record_lost_event(reason, 'transaction')
261268
end
262269
end
263270

sentry-ruby/lib/sentry/transport.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ class Transport
1919
:sample_rate,
2020
:before_send,
2121
:event_processor,
22-
:insufficient_data
22+
:insufficient_data,
23+
:backpressure
2324
]
2425

2526
include LoggingHelper
@@ -119,6 +120,10 @@ def is_rate_limited?(item_type)
119120
!!delay && delay > Time.now
120121
end
121122

123+
def any_rate_limited?
124+
@rate_limits.values.any? { |t| t && t > Time.now }
125+
end
126+
122127
def envelope_from_event(event)
123128
# Convert to hash
124129
event_payload = event.to_hash

sentry-ruby/spec/sentry/background_worker_spec.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,24 @@
9393
expect(string_io.string).to match(/Shutting down background worker/)
9494
end
9595
end
96+
97+
describe "#full?" do
98+
it "returns false if not a thread pool" do
99+
configuration.background_worker_threads = 0
100+
worker = described_class.new(configuration)
101+
expect(worker.full?).to eq(false)
102+
end
103+
104+
it "returns true if thread pool and full" do
105+
configuration.background_worker_threads = 1
106+
configuration.background_worker_max_queue = 1
107+
worker = described_class.new(configuration)
108+
expect(worker.full?).to eq(false)
109+
110+
2.times { worker.perform { sleep 0.1 } }
111+
expect(worker.full?).to eq(true)
112+
sleep 0.2
113+
expect(worker.full?).to eq(false)
114+
end
115+
end
96116
end
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
require 'spec_helper'
2+
3+
RSpec.describe Sentry::BackpressureMonitor do
4+
let(:string_io) { StringIO.new }
5+
6+
before do
7+
perform_basic_setup do |config|
8+
config.enable_backpressure_handling = true
9+
config.logger = Logger.new(string_io)
10+
end
11+
end
12+
13+
let(:configuration) { Sentry.configuration }
14+
let(:client) { Sentry.get_current_client }
15+
let(:transport) { client.transport }
16+
let(:background_worker) { Sentry.background_worker }
17+
18+
subject { described_class.new(configuration, client) }
19+
20+
describe '#healthy?' do
21+
it 'returns true by default' do
22+
expect(subject.healthy?).to eq(true)
23+
end
24+
25+
it 'returns false when unhealthy' do
26+
expect(transport).to receive(:any_rate_limited?).and_return(true)
27+
subject.run
28+
expect(subject.healthy?).to eq(false)
29+
end
30+
31+
it 'spawns new thread' do
32+
expect { subject.healthy? }.to change { Thread.list.count }.by(1)
33+
expect(subject.instance_variable_get(:@thread)).to be_a(Thread)
34+
end
35+
36+
it 'spawns only one thread' do
37+
expect { subject.healthy? }.to change { Thread.list.count }.by(1)
38+
thread = subject.instance_variable_get(:@thread)
39+
expect(thread).to receive(:alive?).and_return(true)
40+
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
41+
end
42+
43+
context 'when thread creation fails' do
44+
before do
45+
expect(Thread).to receive(:new).and_raise(ThreadError)
46+
end
47+
48+
it 'does not create new thread' do
49+
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
50+
end
51+
52+
it 'returns true (the default)' do
53+
expect(subject.healthy?).to eq(true)
54+
end
55+
56+
it 'logs error' do
57+
subject.healthy?
58+
expect(string_io.string).to match(/\[BackpressureMonitor\] Thread creation failed/)
59+
end
60+
end
61+
62+
context 'when killed' do
63+
before { subject.kill }
64+
65+
it 'returns true (the default)' do
66+
expect(subject.healthy?).to eq(true)
67+
end
68+
69+
it 'does not create new thread' do
70+
expect(Thread).not_to receive(:new)
71+
expect { subject.healthy? }.to change { Thread.list.count }.by(0)
72+
end
73+
end
74+
end
75+
76+
# thread behavior is tested above in healthy?
77+
describe '#downsample_factor' do
78+
it 'returns 0 by default' do
79+
expect(subject.downsample_factor).to eq(0)
80+
end
81+
82+
it 'increases when unhealthy upto limit' do
83+
allow(transport).to receive(:any_rate_limited?).and_return(true)
84+
85+
10.times do |i|
86+
subject.run
87+
expect(subject.downsample_factor).to eq(i + 1)
88+
end
89+
90+
2.times do |i|
91+
subject.run
92+
expect(subject.downsample_factor).to eq(10)
93+
end
94+
end
95+
end
96+
97+
describe '#run' do
98+
it 'logs behavior' do
99+
allow(background_worker).to receive(:full?).and_return(true)
100+
subject.run
101+
expect(string_io.string).to match(/\[BackpressureMonitor\] health check negative, downsampling with a factor of 1/)
102+
103+
allow(background_worker).to receive(:full?).and_return(false)
104+
subject.run
105+
expect(string_io.string).to match(/\[BackpressureMonitor\] health check positive, reverting to normal sampling/)
106+
end
107+
end
108+
109+
describe '#kill' do
110+
it 'kills the thread and logs a message' do
111+
subject.healthy?
112+
expect(subject.instance_variable_get(:@thread)).to receive(:kill)
113+
subject.kill
114+
expect(string_io.string).to match(/\[BackpressureMonitor\] killing monitor/)
115+
end
116+
end
117+
end

0 commit comments

Comments
 (0)