Skip to content

Commit d2019ce

Browse files
committed
Fix UTF-8 encoding corruption of binary vector data in search results
Adds preserve_bytes and binary_fields parameters to search methods to prevent UTF-8 decoding from corrupting VECTOR field embeddings and other binary data. The Result class was inappropriately applying UTF-8 decoding to all field values, including binary vector embeddings. This corrupted FLOAT32 vector data and made valkey-py unsuitable for vector search applications. Changes: - Add preserve_bytes parameter to search() methods (default: False for backward compatibility) - Add binary_fields parameter for selective field preservation - Implement to_string_or_bytes() utility for conditional binary preservation - Update Result class to handle binary preservation during field processing - Add comprehensive tests for binary preservation functionality The fix maintains full backward compatibility while enabling proper vector search support when preserve_bytes=True is specified. Fixes vector search corruption where binary embeddings were being decoded as UTF-8 strings with 'ignore' error handling, silently dropping bytes and corrupting the vector data.
1 parent 50b9e73 commit d2019ce

File tree

4 files changed

+241
-15
lines changed

4 files changed

+241
-15
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
"""
2+
Binary preservation tests for search results.
3+
4+
These tests are in a separate file because the main search test suite (test_search.py)
5+
has compatibility issues with the current Valkey search module version. Most existing
6+
search tests fail due to unsupported field types and parameters (e.g., TEXT fields,
7+
SKIPINITIALSCAN, etc.).
8+
9+
Our binary preservation functionality works correctly with the current search module
10+
using direct FT.CREATE commands and KNN vector queries, so we maintain these tests
11+
separately to ensure the feature remains properly tested while the broader search
12+
test compatibility issues are resolved.
13+
"""
14+
15+
import struct
16+
17+
import pytest
18+
import valkey
19+
20+
from .conftest import _get_client, is_resp2_connection, skip_ifmodversion_lt
21+
22+
23+
@pytest.mark.valkeymod
24+
@skip_ifmodversion_lt("1.0.0", "search")
25+
def test_vector_binary_preservation_default_behavior(request):
26+
"""Test that default behavior still corrupts binary data (backward compatibility)"""
27+
client = _get_client(valkey.Valkey, request, decode_responses=False)
28+
29+
# Create index with vector field using direct command
30+
client.execute_command(
31+
"FT.CREATE", "test_idx", "SCHEMA",
32+
"embedding", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "3",
33+
"DISTANCE_METRIC", "COSINE"
34+
)
35+
36+
# Create vector data as bytes (simulating embeddings)
37+
vec1 = [0.1, 0.2, 0.3]
38+
vec1_bytes = struct.pack('3f', *vec1)
39+
40+
# Store document with vector
41+
client.hset("doc:1", mapping={"embedding": vec1_bytes})
42+
43+
# Search without preserve_bytes (default behavior) using KNN query
44+
results = client.ft("test_idx").search(
45+
"*=>[KNN 1 @embedding $vec]", {"vec": vec1_bytes}
46+
)
47+
48+
if is_resp2_connection(client):
49+
doc = results.docs[0]
50+
# Default behavior should decode bytes to string (corrupting binary data)
51+
assert isinstance(doc.embedding, str)
52+
assert doc.embedding != vec1_bytes # Should be corrupted
53+
54+
client.execute_command("FT.DROPINDEX", "test_idx")
55+
56+
57+
@pytest.mark.valkeymod
58+
@skip_ifmodversion_lt("1.0.0", "search")
59+
def test_vector_binary_preservation_enabled(request):
60+
"""Test that preserve_bytes=True preserves binary vector data"""
61+
client = _get_client(valkey.Valkey, request, decode_responses=False)
62+
63+
# Create index with vector field using direct command
64+
client.execute_command(
65+
"FT.CREATE", "test_idx", "SCHEMA",
66+
"embedding", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "3",
67+
"DISTANCE_METRIC", "COSINE"
68+
)
69+
70+
# Create vector data as bytes (simulating embeddings)
71+
vec1 = [0.1, 0.2, 0.3]
72+
vec1_bytes = struct.pack('3f', *vec1)
73+
74+
# Store document with vector
75+
client.hset("doc:1", mapping={"embedding": vec1_bytes})
76+
77+
# Search with preserve_bytes=True using KNN query
78+
results = client.ft("test_idx").search(
79+
"*=>[KNN 1 @embedding $vec]", {"vec": vec1_bytes}, preserve_bytes=True
80+
)
81+
82+
if is_resp2_connection(client):
83+
doc = results.docs[0]
84+
# With preserve_bytes=True, binary data should be preserved
85+
assert isinstance(doc.embedding, bytes)
86+
assert doc.embedding == vec1_bytes
87+
88+
client.execute_command("FT.DROPINDEX", "test_idx")
89+
90+
91+
@pytest.mark.valkeymod
92+
@skip_ifmodversion_lt("1.0.0", "search")
93+
def test_multiple_field_types_and_vectors(request):
94+
"""Test binary preservation with multiple field types and vector dimensions"""
95+
client = _get_client(valkey.Valkey, request, decode_responses=False)
96+
97+
# Create index with diverse field types and different vector dimensions
98+
client.execute_command(
99+
"FT.CREATE", "test_idx", "SCHEMA",
100+
"title", "TAG",
101+
"price", "NUMERIC",
102+
"embedding_3d", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "3",
103+
"DISTANCE_METRIC", "COSINE",
104+
"embedding_4d", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "4",
105+
"DISTANCE_METRIC", "L2",
106+
"binary_data", "TAG"
107+
)
108+
109+
# Create test data with different vector dimensions
110+
vec_3d = [0.1, 0.2, 0.3]
111+
vec_3d_bytes = struct.pack("3f", *vec_3d)
112+
vec_4d = [0.4, 0.5, 0.6, 0.7]
113+
vec_4d_bytes = struct.pack("4f", *vec_4d)
114+
115+
# Store multiple documents
116+
for i in range(3):
117+
client.hset(f"doc:{i + 1}", mapping={
118+
"title": f"item_{i + 1}",
119+
"price": 10.0 + i,
120+
"embedding_3d": vec_3d_bytes,
121+
"embedding_4d": vec_4d_bytes,
122+
"binary_data": b"binary_content"
123+
})
124+
125+
# Test with multiple results (KNN 3 instead of KNN 1)
126+
results = client.ft("test_idx").search(
127+
"*=>[KNN 3 @embedding_3d $vec]",
128+
{"vec": vec_3d_bytes},
129+
preserve_bytes=True,
130+
binary_fields=["embedding_3d", "embedding_4d"]
131+
)
132+
133+
if is_resp2_connection(client):
134+
assert len(results.docs) == 3
135+
for doc in results.docs:
136+
# Vector fields should be preserved as bytes
137+
assert isinstance(doc.embedding_3d, bytes)
138+
assert doc.embedding_3d == vec_3d_bytes
139+
assert isinstance(doc.embedding_4d, bytes)
140+
assert doc.embedding_4d == vec_4d_bytes
141+
# Non-binary fields should be strings
142+
assert isinstance(doc.title, str)
143+
assert isinstance(doc.binary_data, str)
144+
145+
client.execute_command("FT.DROPINDEX", "test_idx")
146+
147+
148+
@pytest.mark.valkeymod
149+
@skip_ifmodversion_lt("1.0.0", "search")
150+
def test_binary_fields_selective_preservation(request):
151+
"""Test that binary_fields parameter selectively preserves specific fields"""
152+
client = _get_client(valkey.Valkey, request, decode_responses=False)
153+
154+
# Create index with vector and tag fields using direct command
155+
client.execute_command(
156+
"FT.CREATE", "test_idx", "SCHEMA",
157+
"embedding1", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "3",
158+
"DISTANCE_METRIC", "COSINE",
159+
"embedding2", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "3",
160+
"DISTANCE_METRIC", "COSINE",
161+
"binary_tag", "TAG"
162+
)
163+
164+
# Create vector data as bytes
165+
vec1 = [0.1, 0.2, 0.3]
166+
vec1_bytes = struct.pack("3f", *vec1)
167+
vec2 = [0.4, 0.5, 0.6]
168+
vec2_bytes = struct.pack("3f", *vec2)
169+
170+
# Store document with vectors and tag
171+
client.hset("doc:1", mapping={
172+
"embedding1": vec1_bytes,
173+
"embedding2": vec2_bytes,
174+
"binary_tag": b"test_tag"
175+
})
176+
177+
# Search with selective binary preservation (only embedding1) using KNN query
178+
results = client.ft("test_idx").search(
179+
"*=>[KNN 1 @embedding1 $vec]",
180+
{"vec": vec1_bytes},
181+
preserve_bytes=True,
182+
binary_fields=["embedding1"]
183+
)
184+
185+
if is_resp2_connection(client):
186+
doc = results.docs[0]
187+
assert isinstance(doc.embedding1, bytes)
188+
assert doc.embedding1 == vec1_bytes
189+
assert isinstance(doc.embedding2, str)
190+
assert isinstance(doc.binary_tag, str)
191+
192+
client.execute_command("FT.DROPINDEX", "test_idx")

valkey/commands/search/_util.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,15 @@ def to_string(s):
55
return s.decode("utf-8", "ignore")
66
else:
77
return s # Not a string we care about
8+
9+
10+
def to_string_or_bytes(s, preserve_bytes=False, binary_fields=None, field_name=None):
11+
"""Convert value to string or preserve as bytes based on parameters."""
12+
if isinstance(s, str):
13+
return s
14+
elif isinstance(s, bytes):
15+
if preserve_bytes and (binary_fields is None or field_name in binary_fields):
16+
return s # Keep as bytes
17+
return s.decode("utf-8", "ignore")
18+
else:
19+
return s # Not a string we care about

valkey/commands/search/commands.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ def _parse_search(self, res, **kwargs):
8080
duration=kwargs["duration"],
8181
has_payload=kwargs["query"]._with_payloads,
8282
with_scores=kwargs["query"]._with_scores,
83+
preserve_bytes=kwargs.get("preserve_bytes", False),
84+
binary_fields=kwargs.get("binary_fields", None),
8385
)
8486

8587
def _parse_aggregate(self, res, **kwargs):
@@ -96,6 +98,8 @@ def _parse_profile(self, res, **kwargs):
9698
duration=kwargs["duration"],
9799
has_payload=query._with_payloads,
98100
with_scores=query._with_scores,
101+
preserve_bytes=kwargs.get("preserve_bytes", False),
102+
binary_fields=kwargs.get("binary_fields", None),
99103
)
100104

101105
return result, parse_to_dict(res[1])
@@ -484,6 +488,8 @@ def search(
484488
self,
485489
query: Union[str, Query],
486490
query_params: Union[Dict[str, Union[str, int, float, bytes]], None] = None,
491+
preserve_bytes: bool = False,
492+
binary_fields: Optional[List[str]] = None,
487493
):
488494
"""
489495
Search the index for a given query, and return a result of documents
@@ -493,6 +499,11 @@ def search(
493499
- **query**: the search query. Either a text for simple queries with
494500
default parameters, or a Query object for complex queries.
495501
See RediSearch's documentation on query format
502+
- **preserve_bytes**: If True, preserve binary field values as bytes
503+
instead of converting to UTF-8 strings
504+
- **binary_fields**: List of field names to preserve as bytes when
505+
preserve_bytes=True. If None, all binary fields
506+
are preserved
496507
497508
For more information see `FT.SEARCH <https://valkey.io/commands/ft.search>`_.
498509
""" # noqa
@@ -504,7 +515,8 @@ def search(
504515
return res
505516

506517
return self._parse_results(
507-
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
518+
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0,
519+
preserve_bytes=preserve_bytes, binary_fields=binary_fields
508520
)
509521

510522
def explain(
@@ -911,6 +923,8 @@ async def search(
911923
self,
912924
query: Union[str, Query],
913925
query_params: Dict[str, Union[str, int, float]] = None,
926+
preserve_bytes: bool = False,
927+
binary_fields: Optional[List[str]] = None,
914928
):
915929
"""
916930
Search the index for a given query, and return a result of documents
@@ -920,6 +934,11 @@ async def search(
920934
- **query**: the search query. Either a text for simple queries with
921935
default parameters, or a Query object for complex queries.
922936
See RediSearch's documentation on query format
937+
- **preserve_bytes**: If True, preserve binary field values as bytes
938+
instead of converting to UTF-8 strings
939+
- **binary_fields**: List of field names to preserve as bytes when
940+
preserve_bytes=True. If None, all binary fields
941+
are preserved
923942
924943
For more information see `FT.SEARCH <https://valkey.io/commands/ft.search>`_.
925944
""" # noqa
@@ -931,7 +950,8 @@ async def search(
931950
return res
932951

933952
return self._parse_results(
934-
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
953+
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0,
954+
preserve_bytes=preserve_bytes, binary_fields=binary_fields
935955
)
936956

937957
async def aggregate(

valkey/commands/search/result.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ._util import to_string
1+
from ._util import to_string, to_string_or_bytes
22
from .document import Document
33

44

@@ -9,7 +9,8 @@ class Result:
99
"""
1010

1111
def __init__(
12-
self, res, hascontent, duration=0, has_payload=False, with_scores=False
12+
self, res, hascontent, duration=0, has_payload=False, with_scores=False,
13+
preserve_bytes=False, binary_fields=None
1314
):
1415
"""
1516
- **snippets**: An optional dictionary of the form
@@ -39,18 +40,19 @@ def __init__(
3940

4041
fields = {}
4142
if hascontent and res[i + fields_offset] is not None:
42-
fields = (
43-
dict(
44-
dict(
45-
zip(
46-
map(to_string, res[i + fields_offset][::2]),
47-
map(to_string, res[i + fields_offset][1::2]),
48-
)
49-
)
43+
field_names = list(map(to_string, res[i + fields_offset][::2]))
44+
field_values = res[i + fields_offset][1::2]
45+
46+
# Process field values with binary preservation
47+
processed_values = []
48+
for field_name, field_value in zip(field_names, field_values):
49+
processed_value = to_string_or_bytes(
50+
field_value, preserve_bytes, binary_fields, field_name
5051
)
51-
if hascontent
52-
else {}
53-
)
52+
processed_values.append(processed_value)
53+
54+
fields = dict(zip(field_names, processed_values))
55+
5456
try:
5557
del fields["id"]
5658
except KeyError:

0 commit comments

Comments
 (0)