Skip to content

Commit fd401b5

Browse files
authored
fix: Fix SSE delay reset handling (#39)
The default retry policy has exponential backoff with jitter. This retry strategy is reset after some maximum retry interval. Previously, once this reset interval occurred, it would continue to reset every future attempt, effectively disabling the exponential backoff algorithm. This should no longer be the case, as we now reset the baseline used for determine when to reset the strategy each time it is replaced with the base strategy.
1 parent db9f0a2 commit fd401b5

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

ld_eventsource/sse_client.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def __init__(
111111

112112
self.__connection_client = connect.create_client(logger)
113113
self.__connection_result: Optional[ConnectionResult] = None
114-
self.__connected_time: float = 0
114+
self._retry_reset_baseline: float = 0
115115
self.__disconnected_time: float = 0
116116

117117
self.__closed = False
@@ -248,10 +248,17 @@ def next_retry_delay(self) -> float:
248248
return self.__next_retry_delay
249249

250250
def _compute_next_retry_delay(self):
251-
if self.__retry_delay_reset_threshold > 0 and self.__connected_time != 0:
252-
connection_duration = time.time() - self.__connected_time
251+
# If the __retry_reset_baseline is 0, then we haven't successfully connected yet.
252+
#
253+
# In those situations, we don't want to reset the retry delay strategy;
254+
# it should continue to double until the retry maximum, and then hold
255+
# steady (- jitter).
256+
if self.__retry_delay_reset_threshold > 0 and self._retry_reset_baseline != 0:
257+
now = time.time()
258+
connection_duration = now - self._retry_reset_baseline
253259
if connection_duration >= self.__retry_delay_reset_threshold:
254260
self.__current_retry_delay_strategy = self.__base_retry_delay_strategy
261+
self._retry_reset_baseline = now
255262
self.__next_retry_delay, self.__current_retry_delay_strategy = (
256263
self.__current_retry_delay_strategy.apply(self.__base_retry_delay)
257264
)
@@ -287,7 +294,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]:
287294
# If can_return_fault is false, it means the caller explicitly called start(), in
288295
# which case there's no way to return a Fault so we just keep retrying transparently.
289296
continue
290-
self.__connected_time = time.time()
297+
self._retry_reset_baseline = time.time()
291298
self.__current_error_strategy = self.__base_error_strategy
292299
self.__interrupted = False
293300
return None

ld_eventsource/testing/test_sse_client_retry.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from time import sleep
2+
13
from ld_eventsource import *
24
from ld_eventsource.actions import *
35
from ld_eventsource.config import *
@@ -101,6 +103,57 @@ def test_all_iterator_continues_after_retry():
101103
assert client.next_retry_delay == initial_delay * 2
102104

103105

106+
def test_retry_delay_gets_reset_after_threshold():
107+
initial_delay = 0.005
108+
retry_delay_reset_threshold = 0.1
109+
mock = MockConnectStrategy(
110+
RespondWithData("data: data1\n\n"),
111+
RejectConnection(HTTPStatusError(503)),
112+
)
113+
with SSEClient(
114+
connect=mock,
115+
error_strategy=ErrorStrategy.always_continue(),
116+
initial_retry_delay=initial_delay,
117+
retry_delay_reset_threshold=retry_delay_reset_threshold,
118+
retry_delay_strategy=RetryDelayStrategy.default(jitter_multiplier=None),
119+
) as client:
120+
assert client._retry_reset_baseline == 0
121+
all = client.all
122+
123+
# Establish a successful connection
124+
item1 = next(all)
125+
assert isinstance(item1, Start)
126+
assert client._retry_reset_baseline != 0
127+
128+
item2 = next(all)
129+
assert isinstance(item2, Event)
130+
assert item2.data == 'data1'
131+
132+
# Stream is dropped and then fails to re-connect, resulting in backoff.
133+
item3 = next(all)
134+
assert isinstance(item3, Fault)
135+
assert client.next_retry_delay == initial_delay
136+
137+
item4 = next(all)
138+
assert isinstance(item4, Fault)
139+
assert client.next_retry_delay == initial_delay * 2
140+
141+
# Sleeping the threshold should reset the retry thresholds
142+
sleep(retry_delay_reset_threshold)
143+
144+
# Which we see it does here
145+
item5 = next(all)
146+
assert isinstance(item5, Fault)
147+
assert client.next_retry_delay == initial_delay
148+
149+
# And if we don't sleep long enough, it doesn't get reset.
150+
sleep(retry_delay_reset_threshold / 2)
151+
152+
item6 = next(all)
153+
assert isinstance(item6, Fault)
154+
assert client.next_retry_delay == initial_delay * 2
155+
156+
104157
def test_can_interrupt_and_restart_stream():
105158
initial_delay = 0.005
106159
mock = MockConnectStrategy(

0 commit comments

Comments
 (0)