Skip to content

Commit 0d36549

Browse files
authored
replace RetryFilter with ErrorStrategy and improve the retry logic (#4)
1 parent c9c67bb commit 0d36549

15 files changed

+498
-426
lines changed

contract-tests/stream_entity.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ def run(self):
4444
request,
4545
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
4646
last_event_id=self.options.get("lastEventId"),
47-
retry_filter=lambda _: RetryFilterResult(not self.closed),
48-
logger=self.log,
49-
defer_connect=True
47+
error_strategy=ErrorStrategy.from_lambda(lambda _:
48+
(ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE, None)),
49+
logger=self.log
5050
)
5151
self.sse = sse
5252
for item in sse.all:

ld_eventsource/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
from ld_eventsource.actions import *
12
from ld_eventsource.errors import *
2-
from ld_eventsource.event import *
3+
from ld_eventsource.error_strategy import *
34
from ld_eventsource.request_params import *
45
from ld_eventsource.retry_delay_strategy import *
5-
from ld_eventsource.retry_filter import *
66
from ld_eventsource.sse_client import *

ld_eventsource/event.py renamed to ld_eventsource/actions.py

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
from typing import Optional
33

44

5-
class Event:
5+
class Action:
6+
pass
7+
8+
9+
class Event(Action):
610
"""
711
An event received by :class:`ld_eventsource.SSEClient`.
812
@@ -63,7 +67,7 @@ def __repr__(self):
6367
)
6468

6569

66-
class Comment:
70+
class Comment(Action):
6771
"""
6872
A comment received by :class:`ld_eventsource.SSEClient`.
6973
@@ -89,7 +93,7 @@ def __repr__(self):
8993
return ":" + self._comment
9094

9195

92-
class Start:
96+
class Start(Action):
9397
"""
9498
Indicates that :class:`ld_eventsource.SSEClient` has successfully connected to a stream.
9599
@@ -101,20 +105,20 @@ class Start:
101105
pass
102106

103107

104-
class Fault:
108+
class Fault(Action):
105109
"""
106110
Indicates that :class:`ld_eventsource.SSEClient` encountered an error or end of stream.
107111
108112
Instances of this class are only available from :prop:`ld_eventsource.SSEClient.all`.
109-
They indicate either 1. a problem that happened after an initial successful connection
110-
was made, or 2. a problem with the initial connection, if you passed `True` for
111-
the `defer_connect` parameter to the :class:`ld_eventsource.SSEClient` constructor.
113+
114+
If you receive a Fault, the SSEClient is now in an inactive state since either a
115+
connection attempt has failed or an existing connection has been closed. The SSEClient
116+
will attempt to reconnect if you either call :func:`start()` or simply continue reading
117+
events after this point.
112118
"""
113119

114-
def __init__(self, error: Optional[Exception], will_retry: bool, retry_delay: float):
120+
def __init__(self, error: Optional[Exception]):
115121
self.__error = error
116-
self.__will_retry = will_retry
117-
self.__retry_delay = retry_delay
118122

119123
@property
120124
def error(self) -> Optional[Exception]:
@@ -124,19 +128,3 @@ def error(self) -> Optional[Exception]:
124128
in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.
125129
"""
126130
return self.__error
127-
128-
@property
129-
def will_retry(self) -> bool:
130-
"""
131-
True if the :class:`ld_eventsource.SSEClient` will try to reconnect. This depends on
132-
the nature of the error and whether a custom `retry_filter` was specified.
133-
"""
134-
return self.__will_retry
135-
136-
@property
137-
def retry_delay(self) -> float:
138-
"""
139-
The time, in seconds, that the :class:`ld_eventsource.SSEClient` will wait before
140-
trying to reconnect. If :prop:`will_retry` was False, then this value is undefined.
141-
"""
142-
return self.__retry_delay

ld_eventsource/error_strategy.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
from __future__ import annotations
2+
import time
3+
from typing import Callable, Optional, Tuple
4+
5+
6+
class ErrorStrategy:
7+
"""Base class of strategies for determining how SSEClient should handle a stream error or the
8+
end of a stream.
9+
10+
The parameter that SSEClient passes to :func:`apply()` is either ``None`` if the server ended
11+
the stream normally, or an exception. If it is an exception, it could be an I/O exception
12+
(failure to connect, broken connection, etc.), or one of the error types defined in this
13+
package such as :class:`ldeventsource.HTTPStatusError`.
14+
15+
The two options for the result are:
16+
17+
- :const:`FAIL`: This means that SSEClient should throw an exception to the caller-- or, in
18+
the case of a stream ending without an error, it should simply stop iterating through events.
19+
- :const:`CONTINUE`: This means that you intend to keep reading events, so SSEClient should
20+
transparently retry the connection. If you are reading from :prop:`ld_eventsource.SSEClient.all`,
21+
you will also receive a :class:`ld_eventsource.Fault` describing the error.
22+
23+
With either option, it is still always possible to explicitly reconnect the stream by calling
24+
:func:`ld_eventsource.SSEClient.start()` again, or simply by trying to read from
25+
:prop:`ld_eventsource.SSEClient.events` or :prop:`ld_eventsource.SSEClient.all` again.
26+
27+
Subclasses should be immutable. To implement strategies that behave differently on consecutive
28+
retries, the strategy should return a new instance of its own class as the second return value
29+
from ``apply``, rather than modifying the state of the existing instance. This makes it easy
30+
for SSEClient to reset to the original error-handling state when appropriate by simply reusing
31+
the original instance.
32+
"""
33+
34+
FAIL = True
35+
CONTINUE = False
36+
37+
def apply(self, exception: Optional[Exception]) -> Tuple[bool, ErrorStrategy]:
38+
"""Applies the strategy to determine what to do after a failure.
39+
40+
:param exception: an I/O error, or one of the exception types defined by this package
41+
(such as :class:`ldeventsource.HTTPStatusError`), or None if the stream simply ended
42+
:return: a tuple where the first element is either :const:`FAIL` to raise an exception
43+
or :const:`CONTINUE` to continue, and the second element is the strategy object to
44+
use next time (which could be ``self``)
45+
"""
46+
raise NotImplementedError("ErrorStrategy base class cannot be used by itself")
47+
48+
@staticmethod
49+
def always_fail() -> ErrorStrategy:
50+
"""
51+
Specifies that SSEClient should always treat an error as a stream failure. This is the
52+
default behavior if you do not configure another.
53+
"""
54+
return _LambdaErrorStrategy(lambda e: (ErrorStrategy.FAIL, None))
55+
56+
@staticmethod
57+
def always_continue() -> ErrorStrategy:
58+
"""
59+
Specifies that SSEClient should never raise an exception, but should transparently retry
60+
or, if :prop:`ld_eventsource.SSEClient.all` is being used, return the error as a
61+
:class:`ld_eventsource.Fault`.
62+
63+
Be aware that using this mode could cause connection attempts to block indefinitely if
64+
the server is unavailable.
65+
"""
66+
return _LambdaErrorStrategy(lambda e: (ErrorStrategy.CONTINUE, None))
67+
68+
@staticmethod
69+
def continue_with_max_attempts(max_attempts: int) -> ErrorStrategy:
70+
"""
71+
Specifies that SSEClient should automatically retry after an error for up to this
72+
number of consecutive attempts, but should fail after that point.
73+
74+
:param max_attempts: the maximum number of consecutive retries
75+
"""
76+
return _MaxAttemptsErrorStrategy(max_attempts, 0)
77+
78+
@staticmethod
79+
def continue_with_time_limit(max_time: float) -> ErrorStrategy:
80+
"""
81+
Specifies that SSEClient should automatically retry after a failure and can retry
82+
repeatedly until this amount of time has elapsed, but should fail after that point.
83+
84+
:param max_time: the time limit, in seconds
85+
"""
86+
return _TimeLimitErrorStrategy(max_time, 0)
87+
88+
@staticmethod
89+
def from_lambda(fn: Callable[[Optional[Exception]], Tuple[bool, Optional[ErrorStrategy]]]) -> ErrorStrategy:
90+
"""
91+
Convenience method for creating an ErrorStrategy whose ``apply`` method is equivalent to
92+
the given lambda.
93+
94+
The one difference is that the second return value is an ``Optional[ErrorStrategy]`` which
95+
can be None to mean "no change", since the lambda cannot reference the strategy's ``self``.
96+
"""
97+
return _LambdaErrorStrategy(fn)
98+
99+
100+
class _LambdaErrorStrategy(ErrorStrategy):
101+
def __init__(self, fn: Callable[[Optional[Exception]], Tuple[bool, Optional[ErrorStrategy]]]):
102+
self.__fn = fn
103+
104+
def apply(self, exception: Optional[Exception]) -> Tuple[bool, ErrorStrategy]:
105+
should_raise, maybe_next = self.__fn(exception)
106+
return (should_raise, maybe_next or self)
107+
108+
class _MaxAttemptsErrorStrategy(ErrorStrategy):
109+
def __init__(self, max_attempts: int, counter: int):
110+
self.__max_attempts = max_attempts
111+
self.__counter = counter
112+
113+
def apply(self, exception: Optional[Exception]) -> Tuple[bool, ErrorStrategy]:
114+
if self.__counter >= self.__max_attempts:
115+
return (ErrorStrategy.FAIL, self)
116+
return (ErrorStrategy.CONTINUE, _MaxAttemptsErrorStrategy(self.__max_attempts, self.__counter + 1))
117+
118+
class _TimeLimitErrorStrategy(ErrorStrategy):
119+
def __init__(self, max_time: float, start_time: float):
120+
self.__max_time = max_time
121+
self.__start_time = start_time
122+
123+
def apply(self, exception: Optional[Exception]) -> Tuple[bool, ErrorStrategy]:
124+
if self.__start_time == 0:
125+
return (ErrorStrategy.CONTINUE, _TimeLimitErrorStrategy(self.__max_time, time.time()))
126+
if (time.time() - self.__start_time) < self.__max_time:
127+
return (ErrorStrategy.CONTINUE, self)
128+
return (ErrorStrategy.FAIL, self)

ld_eventsource/reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ld_eventsource.event import Comment, Event
1+
from ld_eventsource.actions import Comment, Event
22

33
from typing import Callable, Iterable, Optional
44

ld_eventsource/request_params.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ class RequestParams:
77
88
When calling the :class:`ld_eventsource.SSEClient` constructor, you can pass a
99
`RequestParams` instance for the first parameter instead of a simple URL string.
10-
Also, if you have specified a custom `retry_filter`, the filter can set
11-
:prop:`ld_eventsource.retry.RetryFilterResult.request_params` to change the
12-
parameters for the next request on a retry.
1310
"""
1411
def __init__(
1512
self,

0 commit comments

Comments
 (0)