Skip to content

Commit 9480ed3

Browse files
committed
Attempting to fix cluster issues
1 parent 7f2e1fc commit 9480ed3

File tree

6 files changed

+65
-41
lines changed

6 files changed

+65
-41
lines changed

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,17 @@ pyrightconfig.json
216216
[Ll]ocal
217217
pyvenv.cfg
218218
pip-selfcheck.json
219+
env
220+
venv
221+
.venv
219222

220223
libs/redis/docs/.Trash*
221224
.python-version
222225
.idea/*
223226
.vscode/settings.json
224227
.python-version
225228
tests/data
229+
.git
230+
.cursor
231+
.junie
232+
.undodir

redisvl/extensions/cache/base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,14 @@ async def aclear(self) -> None:
216216
await client.delete(*keys)
217217
if cursor_int == 0: # Redis returns 0 when scan is complete
218218
break
219-
cursor = cursor_int # Update cursor for next iteration
219+
# Cluster returns a dict of cursor values. We need to stop if these all
220+
# come back as 0.
221+
elif isinstance(cursor_int, Mapping):
222+
cursor_values = list(cursor_int.values())
223+
if all(v == 0 for v in cursor_values):
224+
break
225+
else:
226+
cursor = cursor_int # Update cursor for next iteration
220227

221228
def disconnect(self) -> None:
222229
"""Disconnect from Redis."""

redisvl/extensions/cache/embeddings/embeddings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(
4747
name=name,
4848
ttl=ttl,
4949
redis_client=redis_client,
50+
async_redis_client=async_redis_client,
5051
redis_url=redis_url,
5152
connection_kwargs=connection_kwargs,
5253
)

tests/cluster-compose.yml

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -132,28 +132,28 @@ services:
132132
NODES="redis-node-1:7001 redis-node-2:7002 redis-node-3:7003 redis-node-4:7004 redis-node-5:7005 redis-node-6:7006"
133133
134134
echo "Force resetting all nodes before cluster creation..."
135-
for NODE_ADDR_PORT in $NODES; do
136-
NODE_HOST=$(echo $NODE_ADDR_PORT | cut -d':' -f1)
137-
NODE_PORT=$(echo $NODE_ADDR_PORT | cut -d':' -f2)
138-
echo "Resetting node $NODE_HOST:$NODE_PORT"
135+
for NODE_ADDR_PORT in $$NODES; do
136+
NODE_HOST=$$(echo $$NODE_ADDR_PORT | cut -d':' -f1)
137+
NODE_PORT=$$(echo $$NODE_ADDR_PORT | cut -d':' -f2)
138+
echo "Resetting node $$NODE_HOST:$$NODE_PORT"
139139
140140
# Wait for node to be responsive
141141
retry_count=0
142142
max_retries=10
143-
until redis-cli -h $NODE_HOST -p $NODE_PORT ping 2>/dev/null | grep -q PONG; do
144-
retry_count=$((retry_count+1))
145-
if [ "$retry_count" -gt "$max_retries" ]; then
146-
echo "Error: Node $NODE_HOST:$NODE_PORT did not respond after $max_retries retries."
143+
until redis-cli -h $$NODE_HOST -p $$NODE_PORT ping 2>/dev/null | grep -q PONG; do
144+
retry_count=$$((retry_count+1))
145+
if [ "$$retry_count" -gt "$$max_retries" ]; then
146+
echo "Error: Node $$NODE_HOST:$$NODE_PORT did not respond after $$max_retries retries."
147147
exit 1 # Exit if a node is unresponsive
148148
fi
149-
echo "Waiting for $NODE_HOST:$NODE_PORT to respond (attempt $retry_count/$max_retries)..."
149+
echo "Waiting for $$NODE_HOST:$$NODE_PORT to respond (attempt $$retry_count/$$max_retries)..."
150150
sleep 3 # Increased sleep between pings
151151
done
152152
153-
echo "Flushing and hard resetting $NODE_HOST:$NODE_PORT"
154-
redis-cli -h $NODE_HOST -p $NODE_PORT FLUSHALL || echo "Warning: FLUSHALL failed on $NODE_HOST:$NODE_PORT, attempting to continue..."
153+
echo "Flushing and hard resetting $$NODE_HOST:$$NODE_PORT"
154+
redis-cli -h $$NODE_HOST -p $$NODE_PORT FLUSHALL || echo "Warning: FLUSHALL failed on $$NODE_HOST:$$NODE_PORT, attempting to continue..."
155155
# Use CLUSTER RESET HARD
156-
redis-cli -h $NODE_HOST -p $NODE_PORT CLUSTER RESET HARD || echo "Warning: CLUSTER RESET HARD failed on $NODE_HOST:$NODE_PORT, attempting to continue..."
156+
redis-cli -h $$NODE_HOST -p $$NODE_PORT CLUSTER RESET HARD || echo "Warning: CLUSTER RESET HARD failed on $$NODE_HOST:$$NODE_PORT, attempting to continue..."
157157
done
158158
echo "Node reset complete."
159159
sleep 5 # Give a moment for resets to settle
@@ -162,29 +162,29 @@ services:
162162
ATTEMPT=1
163163
CLUSTER_CREATED=false
164164
165-
while [ $ATTEMPT -le $MAX_ATTEMPTS ]; do
166-
echo "Attempting to create Redis cluster (Attempt $ATTEMPT/$MAX_ATTEMPTS)..."
167-
output=$(echo yes | redis-cli --cluster create \
168-
$NODES \
165+
while [ $$ATTEMPT -le $$MAX_ATTEMPTS ]; do
166+
echo "Attempting to create Redis cluster (Attempt $$ATTEMPT/$$MAX_ATTEMPTS)..."
167+
output=$$(echo yes | redis-cli --cluster create \
168+
$$NODES \
169169
--cluster-replicas 1 2>&1)
170170
171-
if echo "$output" | grep -q "\[OK\] All 16384 slots covered."; then
171+
if echo "$$output" | grep -q "\[OK\] All 16384 slots covered."; then
172172
echo "Cluster created successfully."
173173
CLUSTER_CREATED=true
174174
break
175175
else
176-
echo "Failed to create cluster on attempt $ATTEMPT."
177-
echo "Output from redis-cli: $output"
178-
if [ $ATTEMPT -lt $MAX_ATTEMPTS ]; then
176+
echo "Failed to create cluster on attempt $$ATTEMPT."
177+
echo "Output from redis-cli: $$output"
178+
if [ $$ATTEMPT -lt $$MAX_ATTEMPTS ]; then
179179
echo "Retrying in 10 seconds..."
180180
sleep 10
181181
fi
182182
fi
183-
ATTEMPT=$((ATTEMPT + 1))
183+
ATTEMPT=$$((ATTEMPT + 1))
184184
done
185185
186-
if [ "$CLUSTER_CREATED" = "false" ]; then
187-
echo "Failed to create cluster after $MAX_ATTEMPTS attempts. Exiting."
186+
if [ "$$CLUSTER_CREATED" = "false" ]; then
187+
echo "Failed to create cluster after $$MAX_ATTEMPTS attempts. Exiting."
188188
exit 1
189189
fi
190190

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ def redis_cluster_container(worker_id):
122122
capture_output=True,
123123
text=True,
124124
)
125-
logger.info("Docker Compose logs:\n", logs_result.stdout)
125+
logger.info("Docker Compose logs:\n%s", logs_result.stdout)
126126
if logs_result.stderr:
127-
logger.error("Docker Compose logs stderr:\n", logs_result.stderr)
127+
logger.error("Docker Compose logs stderr:\n%s", logs_result.stderr)
128128
except Exception as log_e:
129129
logger.error(f"Failed to get Docker Compose logs: {repr(log_e)}")
130130

tests/integration/test_redis_cluster_support.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,29 +83,38 @@ async def test_async_search_index_client(redis_cluster_url):
8383
# Test with AsyncRedis client
8484
cluster_client = AsyncRedisCluster.from_url(redis_cluster_url)
8585
index = AsyncSearchIndex(schema=schema, redis_client=cluster_client)
86-
await index.create(overwrite=True)
87-
await index.load([{"name": "async_test", "age": 25}])
88-
results = await index.query(TextQuery("async_test", "name"))
89-
assert results[0]["name"] == "async_test"
90-
await index.delete(drop=True)
86+
try:
87+
await index.create(overwrite=True)
88+
await index.load([{"name": "async_test", "age": 25}])
89+
results = await index.query(TextQuery("async_test", "name"))
90+
assert results[0]["name"] == "async_test"
91+
await index.delete(drop=True)
92+
finally:
93+
# Manually close the cluster client to prevent connection leaks
94+
await cluster_client.aclose()
9195

9296

93-
def test_embeddings_cache_cluster_async(redis_cluster_url):
97+
@pytest.mark.asyncio
98+
async def test_embeddings_cache_cluster_async(redis_cluster_url):
9499
"""Test that EmbeddingsCache correctly handles AsyncRedisCluster clients."""
95100
cluster_client = RedisConnectionFactory.get_async_redis_cluster_connection(
96101
redis_cluster_url
97102
)
98103
cache = EmbeddingsCache(async_redis_client=cluster_client)
99104

100-
cache.set(
101-
text="hey",
102-
model_name="test",
103-
embedding=[1, 2, 3],
104-
)
105-
result = cache.get("hey", "test")
106-
assert result is not None
107-
assert result["embedding"] == [1, 2, 3]
108-
cache.clear()
105+
try:
106+
await cache.aset(
107+
text="hey",
108+
model_name="test",
109+
embedding=[1, 2, 3],
110+
)
111+
result = await cache.aget("hey", "test")
112+
assert result is not None
113+
assert result["embedding"] == [1, 2, 3]
114+
await cache.aclear()
115+
finally:
116+
# Manually close the cluster client to prevent connection leaks
117+
await cluster_client.aclose()
109118

110119

111120
def test_embeddings_cache_cluster_sync(redis_cluster_url):

0 commit comments

Comments
 (0)