Skip to content

Commit c516f1f

Browse files
authored
factor out HTTP logic with ConnectStrategy abstraction (#6)
1 parent c053f8e commit c516f1f

15 files changed

+754
-406
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ This package's primary purpose is to support the [LaunchDarkly SDK for Python](h
1414
* Setting read timeouts, custom headers, and other HTTP request properties.
1515
* Specifying that connections should be retried under circumstances where the standard EventSource behavior would not retry them, such as if the server returns an HTTP error status.
1616

17-
This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting.
17+
This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.
1818

1919
## Supported Python versions
2020

contract-tests/stream_entity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ def run(self):
3535
stream_url = self.options["streamUrl"]
3636
try:
3737
self.log.info('Opening stream from %s', stream_url)
38-
request = RequestParams(
38+
connect = ConnectStrategy.http(
3939
url=stream_url,
4040
headers=self.options.get("headers"),
4141
urllib3_request_options=None if self.options.get("readTimeoutMs") is None else {
4242
"timeout": urllib3.Timeout(read=millis_to_seconds(self.options.get("readTimeoutMs")))
4343
}
4444
)
4545
sse = SSEClient(
46-
request,
46+
connect,
4747
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
4848
last_event_id=self.options.get("lastEventId"),
4949
error_strategy=ErrorStrategy.from_lambda(lambda _:

ld_eventsource/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
from ld_eventsource.request_params import RequestParams
21
from ld_eventsource.sse_client import *

ld_eventsource/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
from .connect_strategy import ConnectStrategy, ConnectionClient, ConnectionResult
12
from .error_strategy import ErrorStrategy
23
from .retry_delay_strategy import RetryDelayStrategy
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from __future__ import annotations
2+
from logging import Logger
3+
from typing import Callable, Iterator, Optional, Union
4+
from urllib3 import PoolManager
5+
6+
from ld_eventsource.http import _HttpClientImpl, _HttpConnectParams
7+
8+
9+
class ConnectStrategy:
10+
"""
11+
An abstraction for how :class:`.SSEClient` should obtain an input stream.
12+
13+
The default implementation is :meth:`http()`, which makes HTTP requests with ``urllib3``.
14+
Or, if you want to consume an input stream from some other source, you can create your own
15+
subclass of :class:`ConnectStrategy`.
16+
17+
Instances of this class should be immutable and should not contain any state that is specific
18+
to one active stream. The :class:`ConnectionClient` that they produce is stateful and belongs
19+
to a single :class:`.SSEClient`.
20+
"""
21+
22+
def create_client(self, logger: Logger) -> ConnectionClient:
23+
"""
24+
Creates a client instance.
25+
26+
This is called once when an :class:`.SSEClient` is created. The SSEClient returns the
27+
returned :class:`ConnectionClient` and uses it to perform all subsequent connection attempts.
28+
29+
:param logger: the logger being used by the SSEClient
30+
"""
31+
raise NotImplementedError("ConnectStrategy base class cannot be used by itself")
32+
33+
@staticmethod
34+
def http(
35+
url: str,
36+
headers: Optional[dict]=None,
37+
pool: Optional[PoolManager]=None,
38+
urllib3_request_options: Optional[dict]=None
39+
) -> ConnectStrategy:
40+
"""
41+
Creates the default HTTP implementation, specifying request parameters.
42+
43+
:param url: the stream URL
44+
:param headers: optional HTTP headers to add to the request
45+
:param pool: optional urllib3 ``PoolManager`` to provide an HTTP client
46+
:param urllib3_request_options: optional ``kwargs`` to add to the ``request`` call; these
47+
can include any parameters supported by ``urllib3``, such as ``timeout``
48+
"""
49+
return _HttpConnectStrategy(_HttpConnectParams(url, headers, pool, urllib3_request_options))
50+
51+
52+
class ConnectionClient:
53+
"""
54+
An object provided by :class:`.ConnectStrategy` that is retained by a single
55+
:class:`.SSEClient` to perform all connection attempts by that instance.
56+
57+
For the default HTTP implementation, this represents an HTTP connection pool.
58+
"""
59+
60+
def connect(self, last_event_id: Optional[str]) -> ConnectionResult:
61+
"""
62+
Attempts to connect to a stream. Raises an exception if unsuccessful.
63+
64+
:param last_event_id: the current value of :attr:`SSEClient.last_event_id`
65+
(should be sent to the server to support resuming an interrupted stream)
66+
:return: a :class:`ConnectionResult` representing the stream
67+
"""
68+
raise NotImplementedError("ConnectionClient base class cannot be used by itself")
69+
70+
def close(self):
71+
"""
72+
Does whatever is necessary to release resources when the SSEClient is closed.
73+
"""
74+
pass
75+
76+
def __enter__(self):
77+
return self
78+
79+
def __exit__(self, type, value, traceback):
80+
self.close()
81+
82+
83+
84+
class ConnectionResult:
85+
"""
86+
The return type of :meth:`ConnectionClient.connect()`.
87+
"""
88+
def __init__(
89+
self,
90+
stream: Iterator[bytes],
91+
closer: Optional[Callable]
92+
):
93+
self.__stream = stream
94+
self.__closer = closer
95+
96+
@property
97+
def stream(self) -> Iterator[bytes]:
98+
"""
99+
An iterator that returns chunks of data.
100+
"""
101+
return self.__stream
102+
103+
def close(self):
104+
"""
105+
Does whatever is necessary to release the connection.
106+
"""
107+
if self.__closer:
108+
self.__closer()
109+
self.__closer = None
110+
111+
def __enter__(self):
112+
return self
113+
114+
def __exit__(self, type, value, traceback):
115+
self.close()
116+
117+
118+
# _HttpConnectStrategy and _HttpConnectionClient are defined here rather than in http.py to avoid
119+
# a circular module reference.
120+
121+
class _HttpConnectStrategy(ConnectStrategy):
122+
def __init__(self, params: _HttpConnectParams):
123+
self.__params = params
124+
125+
def create_client(self, logger: Logger) -> ConnectionClient:
126+
return _HttpConnectionClient(self.__params, logger)
127+
128+
129+
class _HttpConnectionClient(ConnectionClient):
130+
def __init__(self, params: _HttpConnectParams, logger: Logger):
131+
self.__impl = _HttpClientImpl(params, logger)
132+
133+
def connect(self, last_event_id: Optional[str]) -> ConnectionResult:
134+
stream, closer = self.__impl.connect(last_event_id)
135+
return ConnectionResult(stream, closer)
136+
137+
def close(self):
138+
self.__impl.close()
139+
140+
141+
__all__ = ['ConnectStrategy', 'ConnectionClient', 'ConnectionResult']

ld_eventsource/http.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from logging import Logger
2+
from typing import Callable, Iterator, Optional, Tuple
3+
from urllib3 import PoolManager
4+
from urllib3.exceptions import MaxRetryError
5+
from urllib3.util import Retry
6+
7+
from ld_eventsource.errors import HTTPContentTypeError, HTTPStatusError
8+
9+
_CHUNK_SIZE = 10000
10+
11+
12+
class _HttpConnectParams:
13+
def __init__(
14+
self,
15+
url: str,
16+
headers: Optional[dict]=None,
17+
pool: Optional[PoolManager]=None,
18+
urllib3_request_options: Optional[dict]=None
19+
):
20+
self.__url = url
21+
self.__headers = headers
22+
self.__pool = pool
23+
self.__urllib3_request_options = urllib3_request_options
24+
25+
@property
26+
def url(self) -> str:
27+
return self.__url
28+
29+
@property
30+
def headers(self) -> Optional[dict]:
31+
return self.__headers
32+
33+
@property
34+
def pool(self) -> Optional[PoolManager]:
35+
return self.__pool
36+
37+
@property
38+
def urllib3_request_options(self) -> Optional[dict]:
39+
return self.__urllib3_request_options
40+
41+
42+
class _HttpClientImpl:
43+
def __init__(self, params: _HttpConnectParams, logger: Logger):
44+
self.__params = params
45+
self.__pool = params.pool or PoolManager()
46+
self.__should_close_pool = params.pool is not None
47+
self.__logger = logger
48+
49+
def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable]:
50+
self.__logger.info("Connecting to stream at %s" % self.__params.url)
51+
52+
headers = self.__params.headers.copy() if self.__params.headers else {}
53+
headers['Cache-Control'] = 'no-cache'
54+
headers['Accept'] = 'text/event-stream'
55+
56+
if last_event_id:
57+
headers['Last-Event-ID'] = last_event_id
58+
59+
request_options = self.__params.urllib3_request_options.copy() if self.__params.urllib3_request_options else {}
60+
request_options['headers'] = headers
61+
62+
try:
63+
resp = self.__pool.request(
64+
'GET',
65+
self.__params.url,
66+
preload_content=False,
67+
retries=Retry(total=None, read=0, connect=0, status=0, other=0, redirect=3),
68+
**request_options)
69+
except MaxRetryError as e:
70+
reason: Optional[Exception] = e.reason
71+
if reason is not None:
72+
raise reason # e.reason is the underlying I/O error
73+
if resp.status >= 400 or resp.status == 204:
74+
raise HTTPStatusError(resp.status)
75+
content_type = resp.headers.get('Content-Type', None)
76+
if content_type is None or not str(content_type).startswith("text/event-stream"):
77+
raise HTTPContentTypeError(content_type or '')
78+
79+
stream = resp.stream(_CHUNK_SIZE)
80+
return stream, resp.release_conn
81+
82+
def close(self):
83+
if self.__should_close_pool:
84+
self.__pool.clear()

ld_eventsource/request_params.py

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)