Skip to content

Commit 375bb8d

Browse files
bountxAndrzej Pijanowski
andauthored
feat: retry with back-off logic for Redis (#528)
**Description:** This PR improves Redis functionality by adding a retry wrapper with back-off mechanism resulting in maintaining connections that would otherwise error due to temporary failures. **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [ ] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog --------- Co-authored-by: Andrzej Pijanowski <apijanowski@cloudferro.com>
1 parent 43fd5d8 commit 375bb8d

File tree

4 files changed

+189
-87
lines changed

4 files changed

+189
-87
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
### Added
1111

12+
- Added retry with back-off logic for Redis related functions. [#528](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/528)
1213
- Added nanosecond precision datetime filtering that ensures nanosecond precision support in filtering by datetime. This is configured via the `USE_DATETIME_NANOS` environment variable, while maintaining microseconds compatibility for datetime precision. [#529](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/529)
1314

1415
### Changed

stac_fastapi/core/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ dependencies = [
4545
"jsonschema~=4.0.0",
4646
"slowapi~=0.1.9",
4747
"redis==6.4.0",
48+
"retry==0.9.2",
4849
]
4950

5051
[project.urls]

stac_fastapi/core/stac_fastapi/core/redis_utils.py

Lines changed: 97 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,39 @@
22

33
import json
44
import logging
5-
from typing import List, Optional, Tuple
5+
from functools import wraps
6+
from typing import Callable, List, Optional, Tuple, cast
67
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
78

89
from pydantic import Field, field_validator
910
from pydantic_settings import BaseSettings
1011
from redis import asyncio as aioredis
1112
from redis.asyncio.sentinel import Sentinel
13+
from redis.exceptions import ConnectionError as RedisConnectionError
14+
from redis.exceptions import TimeoutError as RedisTimeoutError
15+
from retry import retry # type: ignore
1216

1317
logger = logging.getLogger(__name__)
1418

1519

16-
class RedisSentinelSettings(BaseSettings):
17-
"""Configuration for connecting to Redis Sentinel."""
20+
class RedisCommonSettings(BaseSettings):
21+
"""Common configuration for Redis Sentinel and Redis Standalone."""
1822

19-
REDIS_SENTINEL_HOSTS: str = ""
20-
REDIS_SENTINEL_PORTS: str = "26379"
21-
REDIS_SENTINEL_MASTER_NAME: str = "master"
2223
REDIS_DB: int = 15
23-
2424
REDIS_MAX_CONNECTIONS: Optional[int] = None
2525
REDIS_RETRY_TIMEOUT: bool = True
2626
REDIS_DECODE_RESPONSES: bool = True
2727
REDIS_CLIENT_NAME: str = "stac-fastapi-app"
2828
REDIS_HEALTH_CHECK_INTERVAL: int = Field(default=30, gt=0)
2929
REDIS_SELF_LINK_TTL: int = 1800
3030

31+
REDIS_QUERY_RETRIES_NUM: int = Field(default=3, gt=0)
32+
REDIS_QUERY_INITIAL_DELAY: float = Field(default=1.0, gt=0)
33+
REDIS_QUERY_BACKOFF: float = Field(default=2.0, gt=1)
34+
3135
@field_validator("REDIS_DB")
3236
@classmethod
33-
def validate_db_sentinel(cls, v: int) -> int:
37+
def validate_db(cls, v: int) -> int:
3438
"""Validate REDIS_DB is not negative integer."""
3539
if v < 0:
3640
raise ValueError("REDIS_DB must be a positive integer")
@@ -46,12 +50,20 @@ def validate_max_connections(cls, v):
4650

4751
@field_validator("REDIS_SELF_LINK_TTL")
4852
@classmethod
49-
def validate_self_link_ttl_sentinel(cls, v: int) -> int:
50-
"""Validate REDIS_SELF_LINK_TTL is not a negative integer."""
53+
def validate_self_link_ttl(cls, v: int) -> int:
54+
"""Validate REDIS_SELF_LINK_TTL is negative."""
5155
if v < 0:
5256
raise ValueError("REDIS_SELF_LINK_TTL must be a positive integer")
5357
return v
5458

59+
60+
class RedisSentinelSettings(RedisCommonSettings):
61+
"""Configuration for connecting to Redis Sentinel."""
62+
63+
REDIS_SENTINEL_HOSTS: str = ""
64+
REDIS_SENTINEL_PORTS: str = "26379"
65+
REDIS_SENTINEL_MASTER_NAME: str = "master"
66+
5567
def get_sentinel_hosts(self) -> List[str]:
5668
"""Parse Redis Sentinel hosts from string to list."""
5769
if not self.REDIS_SENTINEL_HOSTS:
@@ -96,19 +108,11 @@ def get_sentinel_nodes(self) -> List[Tuple[str, int]]:
96108
return [(str(host), int(port)) for host, port in zip(hosts, ports)]
97109

98110

99-
class RedisSettings(BaseSettings):
111+
class RedisSettings(RedisCommonSettings):
100112
"""Configuration for connecting Redis."""
101113

102114
REDIS_HOST: str = ""
103115
REDIS_PORT: int = 6379
104-
REDIS_DB: int = 15
105-
106-
REDIS_MAX_CONNECTIONS: Optional[int] = None
107-
REDIS_RETRY_TIMEOUT: bool = True
108-
REDIS_DECODE_RESPONSES: bool = True
109-
REDIS_CLIENT_NAME: str = "stac-fastapi-app"
110-
REDIS_HEALTH_CHECK_INTERVAL: int = Field(default=30, gt=0)
111-
REDIS_SELF_LINK_TTL: int = 1800
112116

113117
@field_validator("REDIS_PORT")
114118
@classmethod
@@ -118,89 +122,93 @@ def validate_port_standalone(cls, v: int) -> int:
118122
raise ValueError("REDIS_PORT must be a positive integer")
119123
return v
120124

121-
@field_validator("REDIS_DB")
122-
@classmethod
123-
def validate_db_standalone(cls, v: int) -> int:
124-
"""Validate REDIS_DB is not a negative integer."""
125-
if v < 0:
126-
raise ValueError("REDIS_DB must be a positive integer")
127-
return v
128-
129-
@field_validator("REDIS_MAX_CONNECTIONS", mode="before")
130-
@classmethod
131-
def validate_max_connections(cls, v):
132-
"""Handle empty/None values for REDIS_MAX_CONNECTIONS."""
133-
if v in ["", "null", "Null", "NULL", "none", "None", "NONE", None]:
134-
return None
135-
return v
136-
137-
@field_validator("REDIS_SELF_LINK_TTL")
138-
@classmethod
139-
def validate_self_link_ttl_standalone(cls, v: int) -> int:
140-
"""Validate REDIS_SELF_LINK_TTL is negative."""
141-
if v < 0:
142-
raise ValueError("REDIS_SELF_LINK_TTL must be a positive integer")
143-
return v
144-
145125

146126
# Configure only one Redis configuration
147127
sentinel_settings = RedisSentinelSettings()
148-
standalone_settings = RedisSettings()
128+
settings: RedisCommonSettings = cast(
129+
RedisCommonSettings,
130+
sentinel_settings if sentinel_settings.REDIS_SENTINEL_HOSTS else RedisSettings(),
131+
)
132+
133+
134+
def redis_retry(func: Callable) -> Callable:
135+
"""Retry with back-off decorator for Redis connections."""
136+
137+
@wraps(func)
138+
@retry(
139+
exceptions=(RedisConnectionError, RedisTimeoutError),
140+
tries=settings.REDIS_QUERY_RETRIES_NUM,
141+
delay=settings.REDIS_QUERY_INITIAL_DELAY,
142+
backoff=settings.REDIS_QUERY_BACKOFF,
143+
logger=logger,
144+
)
145+
async def wrapper(*args, **kwargs):
146+
return await func(*args, **kwargs)
149147

148+
return wrapper
150149

151-
async def connect_redis() -> Optional[aioredis.Redis]:
150+
151+
@redis_retry
152+
async def _connect_redis_internal() -> Optional[aioredis.Redis]:
152153
"""Return a Redis connection Redis or Redis Sentinel."""
153-
try:
154-
if sentinel_settings.REDIS_SENTINEL_HOSTS:
155-
sentinel_nodes = sentinel_settings.get_sentinel_nodes()
156-
sentinel = Sentinel(
157-
sentinel_nodes,
158-
decode_responses=sentinel_settings.REDIS_DECODE_RESPONSES,
159-
)
154+
if sentinel_settings.REDIS_SENTINEL_HOSTS:
155+
sentinel_nodes = settings.get_sentinel_nodes()
156+
sentinel = Sentinel(
157+
sentinel_nodes,
158+
decode_responses=settings.REDIS_DECODE_RESPONSES,
159+
)
160160

161-
redis = sentinel.master_for(
162-
service_name=sentinel_settings.REDIS_SENTINEL_MASTER_NAME,
163-
db=sentinel_settings.REDIS_DB,
164-
decode_responses=sentinel_settings.REDIS_DECODE_RESPONSES,
165-
retry_on_timeout=sentinel_settings.REDIS_RETRY_TIMEOUT,
166-
client_name=sentinel_settings.REDIS_CLIENT_NAME,
167-
max_connections=sentinel_settings.REDIS_MAX_CONNECTIONS,
168-
health_check_interval=sentinel_settings.REDIS_HEALTH_CHECK_INTERVAL,
169-
)
170-
logger.info("Connected to Redis Sentinel")
171-
172-
elif standalone_settings.REDIS_HOST:
173-
pool = aioredis.ConnectionPool(
174-
host=standalone_settings.REDIS_HOST,
175-
port=standalone_settings.REDIS_PORT,
176-
db=standalone_settings.REDIS_DB,
177-
max_connections=standalone_settings.REDIS_MAX_CONNECTIONS,
178-
decode_responses=standalone_settings.REDIS_DECODE_RESPONSES,
179-
retry_on_timeout=standalone_settings.REDIS_RETRY_TIMEOUT,
180-
health_check_interval=standalone_settings.REDIS_HEALTH_CHECK_INTERVAL,
181-
)
182-
redis = aioredis.Redis(
183-
connection_pool=pool, client_name=standalone_settings.REDIS_CLIENT_NAME
184-
)
185-
logger.info("Connected to Redis")
186-
else:
187-
logger.warning("No Redis configuration found")
188-
return None
161+
redis = sentinel.master_for(
162+
service_name=settings.REDIS_SENTINEL_MASTER_NAME,
163+
db=settings.REDIS_DB,
164+
decode_responses=settings.REDIS_DECODE_RESPONSES,
165+
retry_on_timeout=settings.REDIS_RETRY_TIMEOUT,
166+
client_name=settings.REDIS_CLIENT_NAME,
167+
max_connections=settings.REDIS_MAX_CONNECTIONS,
168+
health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL,
169+
)
170+
logger.info("Connected to Redis Sentinel")
171+
172+
elif settings.REDIS_HOST:
173+
pool = aioredis.ConnectionPool(
174+
host=settings.REDIS_HOST,
175+
port=settings.REDIS_PORT,
176+
db=settings.REDIS_DB,
177+
max_connections=settings.REDIS_MAX_CONNECTIONS,
178+
decode_responses=settings.REDIS_DECODE_RESPONSES,
179+
retry_on_timeout=settings.REDIS_RETRY_TIMEOUT,
180+
health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL,
181+
)
182+
redis = aioredis.Redis(
183+
connection_pool=pool, client_name=settings.REDIS_CLIENT_NAME
184+
)
185+
logger.info("Connected to Redis")
186+
else:
187+
logger.warning("No Redis configuration found")
188+
return None
189+
190+
return redis
189191

190-
return redis
191192

193+
async def connect_redis() -> Optional[aioredis.Redis]:
194+
"""Handle Redis connection."""
195+
try:
196+
return await _connect_redis_internal()
197+
except (
198+
aioredis.ConnectionError,
199+
aioredis.TimeoutError,
200+
) as e:
201+
logger.error(f"Redis connection failed after retries: {e}")
192202
except aioredis.ConnectionError as e:
193203
logger.error(f"Redis connection error: {e}")
194204
return None
195205
except aioredis.AuthenticationError as e:
196206
logger.error(f"Redis authentication error: {e}")
197207
return None
198-
except aioredis.TimeoutError as e:
199-
logger.error(f"Redis timeout error: {e}")
200-
return None
201208
except Exception as e:
202209
logger.error(f"Failed to connect to Redis: {e}")
203210
return None
211+
return None
204212

205213

206214
def get_redis_key(url: str, token: str) -> str:
@@ -230,19 +238,21 @@ def build_url_with_token(base_url: str, token: str) -> str:
230238
)
231239

232240

241+
@redis_retry
233242
async def save_prev_link(
234243
redis: aioredis.Redis, next_url: str, current_url: str, next_token: str
235244
) -> None:
236245
"""Save the current page as the previous link for the next URL."""
237246
if next_url and next_token:
238247
if sentinel_settings.REDIS_SENTINEL_HOSTS:
239-
ttl_seconds = sentinel_settings.REDIS_SELF_LINK_TTL
240-
elif standalone_settings.REDIS_HOST:
241-
ttl_seconds = standalone_settings.REDIS_SELF_LINK_TTL
248+
ttl_seconds = settings.REDIS_SELF_LINK_TTL
249+
elif settings.REDIS_HOST:
250+
ttl_seconds = settings.REDIS_SELF_LINK_TTL
242251
key = get_redis_key(next_url, next_token)
243252
await redis.setex(key, ttl_seconds, current_url)
244253

245254

255+
@redis_retry
246256
async def get_prev_link(
247257
redis: aioredis.Redis, current_url: str, current_token: str
248258
) -> Optional[str]:

stac_fastapi/tests/redis/test_redis_utils.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
2+
from redis.exceptions import ConnectionError as RedisConnectionError
23

4+
import stac_fastapi.core.redis_utils as redis_utils
35
from stac_fastapi.core.redis_utils import connect_redis, get_prev_link, save_prev_link
46

57

@@ -46,3 +48,91 @@ async def test_redis_utils_functions():
4648
redis, "http://mywebsite.com/search", "non_existent_token"
4749
)
4850
assert non_existent is None
51+
52+
53+
@pytest.mark.asyncio
54+
async def test_redis_retry_retries_until_success(monkeypatch):
55+
monkeypatch.setattr(
56+
redis_utils.settings, "REDIS_QUERY_RETRIES_NUM", 3, raising=False
57+
)
58+
monkeypatch.setattr(
59+
redis_utils.settings, "REDIS_QUERY_INITIAL_DELAY", 0, raising=False
60+
)
61+
monkeypatch.setattr(redis_utils.settings, "REDIS_QUERY_BACKOFF", 2.0, raising=False)
62+
63+
captured_kwargs = {}
64+
65+
def fake_retry(**kwargs):
66+
captured_kwargs.update(kwargs)
67+
68+
def decorator(func):
69+
async def wrapped(*args, **inner_kwargs):
70+
attempts = 0
71+
while True:
72+
try:
73+
attempts += 1
74+
return await func(*args, **inner_kwargs)
75+
except kwargs["exceptions"] as exc:
76+
if attempts >= kwargs["tries"]:
77+
raise exc
78+
continue
79+
80+
return wrapped
81+
82+
return decorator
83+
84+
monkeypatch.setattr(redis_utils, "retry", fake_retry)
85+
86+
call_counter = {"count": 0}
87+
88+
@redis_utils.redis_retry
89+
async def flaky() -> str:
90+
call_counter["count"] += 1
91+
if call_counter["count"] < 3:
92+
raise RedisConnectionError("transient failure")
93+
return "success"
94+
95+
result = await flaky()
96+
97+
assert result == "success"
98+
assert call_counter["count"] == 3
99+
assert captured_kwargs["tries"] == redis_utils.settings.REDIS_QUERY_RETRIES_NUM
100+
assert captured_kwargs["delay"] == redis_utils.settings.REDIS_QUERY_INITIAL_DELAY
101+
assert captured_kwargs["backoff"] == redis_utils.settings.REDIS_QUERY_BACKOFF
102+
103+
104+
@pytest.mark.asyncio
105+
async def test_redis_retry_raises_after_exhaustion(monkeypatch):
106+
monkeypatch.setattr(
107+
redis_utils.settings, "REDIS_QUERY_RETRIES_NUM", 3, raising=False
108+
)
109+
monkeypatch.setattr(
110+
redis_utils.settings, "REDIS_QUERY_INITIAL_DELAY", 0, raising=False
111+
)
112+
monkeypatch.setattr(redis_utils.settings, "REDIS_QUERY_BACKOFF", 2.0, raising=False)
113+
114+
def fake_retry(**kwargs):
115+
def decorator(func):
116+
async def wrapped(*args, **inner_kwargs):
117+
attempts = 0
118+
while True:
119+
try:
120+
attempts += 1
121+
return await func(*args, **inner_kwargs)
122+
except kwargs["exceptions"] as exc:
123+
if attempts >= kwargs["tries"]:
124+
raise exc
125+
continue
126+
127+
return wrapped
128+
129+
return decorator
130+
131+
monkeypatch.setattr(redis_utils, "retry", fake_retry)
132+
133+
@redis_utils.redis_retry
134+
async def always_fail() -> str:
135+
raise RedisConnectionError("pernament failure")
136+
137+
with pytest.raises(RedisConnectionError):
138+
await always_fail()

0 commit comments

Comments
 (0)