Skip to content

Commit 26c48eb

Browse files
committed
knn: add some missing newlines from logging; Cohere v3: downloader/processing tool mostly works, but fails to preserve same paragraphs of one wiki_id together
1 parent 5c9d3ff commit 26c48eb

File tree

2 files changed

+225
-57
lines changed

2 files changed

+225
-57
lines changed

src/main/knn/KnnGraphTester.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,13 +628,13 @@ private void run(String... args) throws Exception {
628628
s = Files.readString(path);
629629
} catch (NoSuchFileException nsfe) {
630630
// ok -- back compat: old index that didn't write indexing time
631-
log("WARNING: index did not record index time msec in %s", path);
631+
log("WARNING: index did not record index time msec in %s\n", path);
632632
s = null;
633633
}
634634
if (s != null) {
635635
log("retrieving previously saved indexing time in %s\n", path);
636636
reindexTimeMsec = Integer.parseInt(s);
637-
log("read previously saved indexing time: %d msec", reindexTimeMsec);
637+
log("read previously saved indexing time: %d msec\n", reindexTimeMsec);
638638
} else {
639639
reindexTimeMsec = -1;
640640
}
@@ -649,13 +649,13 @@ private void run(String... args) throws Exception {
649649
s = Files.readString(path);
650650
} catch (NoSuchFileException nsfe) {
651651
// ok -- back compat: old index that didn't write force merge time
652-
log("WARNING: index did not record force-merge-time seconds in %s", path);
652+
log("WARNING: index did not record force-merge-time seconds in %s\n", path);
653653
s = null;
654654
}
655655
if (s != null) {
656656
log("retrieving previously saved force merge time in %s\n", path);
657657
forceMergeTimeSec = Double.parseDouble(s);
658-
log("previously saved force merge time: %g sec", forceMergeTimeSec);
658+
log("previously saved force merge time: %g sec\n", forceMergeTimeSec);
659659
} else {
660660
forceMergeTimeSec = -1;
661661
}
@@ -714,7 +714,7 @@ private void printIndexStatistics(Path indexPath, String field) throws IOExcepti
714714
log("index has %d segments: %s\n", indexNumSegments, ((StandardDirectoryReader) reader).getSegmentInfos());
715715

716716
if (indexNumSegments == 1 && numSearchThread > 1) {
717-
log("WARNING: intra-query concurrency requested (-numSearchThread=%d) but index has only one segment so there will be no concurrency!", numSearchThread);
717+
log("WARNING: intra-query concurrency requested (-numSearchThread=%d) but index has only one segment so there will be no concurrency!\n", numSearchThread);
718718
}
719719

720720
long indexSizeOnDiskBytes = 0;
@@ -1436,7 +1436,7 @@ private static void runTasksWithProgress(List<Callable<Void>> tasks, AtomicInteg
14361436

14371437
// oddly, at least on beast3 (128 cores), exact NN is much slower with all (including hyperthread'd) cores:
14381438
int poolThreadCount = Math.max(1, coreCount/2);
1439-
log.log("using %d threads to compute exact NN", poolThreadCount);
1439+
log.log("using %d threads to compute exact NN\n", poolThreadCount);
14401440
ForkJoinPool pool = new ForkJoinPool(poolThreadCount);
14411441

14421442
int taskCount = tasks.size();
@@ -1624,10 +1624,10 @@ public Void call() {
16241624
result[queryOrd] = knn.KnnTesterUtils.getResultIds(topDocs, searcher.storedFields());
16251625
completedCount.incrementAndGet();
16261626
} catch (IOException e) {
1627-
log("Exception " + e);
1627+
log("Exception " + e + "\n");
16281628
throw new RuntimeException(e);
16291629
} catch (Throwable t) {
1630-
log("Throwable " + t);
1630+
log("Throwable " + t + "\n");
16311631
throw t;
16321632
}
16331633
return null;

src/python/load_cohere_v3.py

Lines changed: 217 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,61 @@
1+
import os
12
import struct
23
import subprocess
34
import time
4-
from datasets import load_dataset
5+
import datasets
56
import csv
67
import struct
78
import numpy as np
89

10+
csv.field_size_limit(1024*1024*40)
11+
912
# TODO
13+
# - ugh -- need to fix shuffle to keep pages (all adjacent rows with same wiki_id) together
1014
# - finish/publish these new v3 vec sources, send email
1115
# - babysit nightly, then add annot
12-
# - run nightly knn
13-
# - KnnIndexer should support two commit points in one index -- not-force-merged, force-merged
14-
# - create auto-committer Lucene "plugin" -- wraps Directory and MergeScheduler?
15-
# - open upstream issue
16-
# - could be used here to have many commit points in-between not-force-merged and force-merged, to
17-
# make it easier to see effect of N segment on perf/recall ROC
18-
# - is corpus natively stored/downloaded as float32 or maybe float64?
19-
# - build IO profiler directory wrapper
20-
# - build off-heap version mapping
21-
# - upgrade to v3
16+
# - test knn
17+
# - install for nightly knn, babysit
2218
# - what is "split"
2319
# - write meta file for parent/child join
2420
# - write title/id/other-features
2521
# - shuffle
2622
# - then make separate query + index files
27-
# - derive dims from the model
23+
24+
"""
25+
26+
This tool downloads all metadata + vectors from
27+
https://huggingface.co/datasets/Cohere/wikipedia-2023-11-embed-multilingual-v3
28+
(Cohere v3 Wikipedia embeddings).
29+
30+
We download just lang="en" and split="train".
31+
32+
This is 41_488_110 rows, but each row is one paragraph from a wiki
33+
page and (in general) multiple paragraphs per page. There are
34+
5_854_887 unique pages, so ~7.1 paragraphs per page on average.
35+
36+
Vectors are 1024 dimensions (up from 768 in Cohere v2), float32, and
37+
seem to be unit-sphere normalized (at least the first 10 vectors are).
38+
39+
Unlike the v2 Cohere Wikipedia vectors
40+
(https://huggingface.co/datasets/Cohere/wikipedia-22-12), these docs
41+
do not have a stated sort order. Still, we shuffle them to remove any
42+
possible hidden compass bias (see
43+
https://github.com/mikemccand/luceneutil/issues/494).
44+
45+
"""
46+
47+
ID_PREFIX = '20231101.en_'
2848

2949
DIMENSIONS = 1024
3050

51+
# total 15_562_116_893 chars in text
52+
# total 851_864_089 chars in title
53+
54+
TOTAL_DOC_COUNT = 41_488_110 # + 1 header in the initial CSV
55+
56+
57+
TOTAL_DOC_COUNT = 5_854_887
58+
3159
LANG = 'en'
3260

3361
def run(command):
@@ -37,62 +65,202 @@ def run(command):
3765
print(f' took {time.time() - t0:.1f} sec')
3866

3967
def main():
68+
69+
# where we will download and write our shuffled vectors, before splitting into queries and docs
4070
csv_source_file = '/b3/cohere-wikipedia-v3.csv'
4171
vec_source_file = '/b3/cohere-wikipedia-v3.vec'
4272

43-
if True:
44-
docs = load_dataset("Cohere/wikipedia-2023-11-embed-multilingual-v3", LANG, split="train", streaming=True)
45-
print(f'columns: {docs.column_names}')
46-
print(f'features: {docs.column_names}')
73+
if False:
74+
docs = datasets.load_dataset("Cohere/wikipedia-2023-11-embed-multilingual-v3", LANG, split="train", streaming=True)
75+
#print(f'columns: {docs.column_names}')
76+
77+
features = docs.features
78+
print(f'\nfeatures:')
79+
for feature_name, feature_details in features.items():
80+
print(f' {feature_name}: {feature_details}')
81+
print('\n')
82+
83+
if False:
84+
for feature_name, feature_details in features.items():
85+
print(f" Feature Name: {feature_name}")
86+
print(f" Feature Type: {type(feature_details).__name__}")
87+
if hasattr(feature_details, 'dtype'):
88+
print(f" Data Type: {feature_details.dtype}")
89+
90+
# Check for List type (also handles older Sequence type if needed)
91+
if isinstance(feature_details, datasets.Array2D):
92+
print(f"- Column: '{column_name}' is a fixed-size Array.")
93+
# The shape attribute will provide the dimensions
94+
print(f" - Shape: {feature_details.shape}, Dtype: {feature_details.dtype}")
95+
96+
elif isinstance(feature_details, (datasets.List, datasets.Sequence)):
97+
inner_type = feature_details.feature
98+
# Check if the inner type is a Value and get its dtype
99+
if isinstance(inner_type, datasets.Value):
100+
print(f" - Type: List (inner dtype: {inner_type.dtype})")
101+
else:
102+
# Handle nested structures within the list (e.g., List of dicts)
103+
print(f" - Type: List (inner type details: {inner_type})")
104+
105+
# Check for simple Value type (like float32, int32, string)
106+
elif isinstance(feature_details, datasets.Value):
107+
if feature_details.dtype in ['float32', 'float']:
108+
print(f" - Type: Value (dtype: {feature_details.dtype})")
109+
else:
110+
print(f" - Type: Value (dtype: {feature_details.dtype})")
111+
112+
else:
113+
# Handle other types like ClassLabel, Array, etc.
114+
print(f" - Type: Other (details: {type(feature_details).__name__})")
47115

116+
# You can add more specific checks for nested features or specific types if needed
117+
print("-" * 20)
118+
48119
if False:
49-
count = 0
120+
row_count = 0
50121
dimensions = 1024
51122
headers = ['id', 'title', 'text', 'url']
52123
start_time_sec = time.time()
53124
# print('%s' % dir(docs['emb']))
54-
# should be 41488110 rows + 1 header
125+
126+
total_text_chars = 0
127+
total_title_chars = 0
128+
129+
total_doc_count = 0
130+
cur_wiki_id = None
131+
55132
with open(csv_source_file, 'w') as meta_out, open(vec_source_file, 'wb') as vec_out:
56133
meta_csv_out = csv.writer(meta_out)
57134
meta_csv_out.writerow(headers)
58135
for doc in docs:
59136
meta_csv_out.writerow([doc['_id'], doc['title'], doc['text']])
137+
total_text_chars += len(doc['text'])
138+
total_title_chars += len(doc['title'])
139+
wiki_id, paragraph_id = split_id(doc['_id'], row_count)
140+
if wiki_id != cur_wiki_id:
141+
total_doc_count += 1
142+
cur_wiki_id = wiki_id
143+
60144
emb = np.array(doc['emb'], dtype=np.float32)
145+
61146
if len(emb) != DIMENSIONS:
62147
raise RuntimeError(f'planned on {DIMENSIONS} dims but corpus is {len(emb)}!')
63148
# print(f'{type(emb)}')
64149
emb.tofile(vec_out)
65-
count += 1
66-
if count % 10000 == 0:
67-
print(f'{time.time() - start_time_sec:6.1f} sec: {count}... {vec_out.tell()} and {meta_out.tell()}')
68-
69-
print(f'now insert line numbers')
70-
run(f'nl -v 0 -ba {csv_source_file} > {csv_source_file}.num')
71-
72-
print(f'now shuffle')
73-
run(f'shuf {csv_source_file}.num > {csv_source_file}.num.shuf')
74-
75-
print(f'now remove line numbers')
76-
run(f'cut -f 2- {csv_source_file}.num.shuf > {csv_source_file}.final')
77-
78-
print(f'now sort to get reverse mapping')
79-
run(f'nl -v 0 -ba {csv_source_file}.num.shuf | sort -nk2 > {csv_source_file}.mapping')
80-
81-
print(f'now cut to just the one new-position column')
82-
run(f'cut -f1 {csv_source_file}.mapping > {csv_source_file}.only_mapping')
83-
84-
print(f'now shuffle vectors matching')
85-
run(f'python3 -u src/python/shuffle_vec_file.py {vec_source_file} {vec_source_file}.shuffled {DIMENSIONS} {csv_source_file}.only_mapping')
86-
87-
with open(f'{vec_source_file}.shuffled', 'rb') as f:
88-
b = f.read(DIMENSIONS * 4)
89-
one_vec = struct.unpack(f'<{DIMENSIONS}f', b)
90-
print(f'vec is length {len(one_vec)}')
91-
sumsq = 0
92-
for i, v in enumerate(one_vec):
93-
print(f' {i:4d}: {v:g}')
94-
sumsq += v*v
95-
print(f' sumsq={sumsq}')
150+
row_count += 1
151+
if row_count % 10000 == 0:
152+
print(f'{time.time() - start_time_sec:6.1f} sec: {row_count} ({total_doc_count} wiki docs)... {vec_out.tell()} and {meta_out.tell()}')
153+
154+
print(f'Done initial download!\n {row_count=} {total_doc_count=} {total_text_chars=} {total_title_chars=}')
155+
print(f'{csv_source_file} is {os.path.getsize(csv_source_file)/1024/1024/1024:.2f} GB')
156+
print(f'{vec_source_file} is {os.path.getsize(vec_source_file)/1024/1024/1024:.2f} GB')
157+
158+
os.chmod(csv_source_file, 0o444)
159+
os.chmod(vec_source_file, 0o444)
160+
161+
if count != TOTAL_DOC_COUNT:
162+
raise RuntimeError(f'expected {TOTAL_DOC_COUNT=} but saw {row_count=}')
163+
164+
if False:
165+
166+
print(f'strip csv header')
167+
run(f'sed \'1d\' {csv_source_file} > {csv_source_file}.noheader')
168+
169+
print(f'now insert line numbers')
170+
run(f'nl -v 0 -ba {csv_source_file}.noheader > {csv_source_file}.num')
171+
172+
print(f'now shuffle')
173+
run(f'shuf {csv_source_file}.num > {csv_source_file}.num.shuf')
174+
175+
# this is the actual output meta CSV, post shuffle
176+
print(f'now remove line numbers')
177+
run(f'cut -f 2- {csv_source_file}.num.shuf > {csv_source_file}.final')
178+
179+
print(f'now sort to get reverse mapping')
180+
run(f'nl -v 0 -ba {csv_source_file}.num.shuf | sort -nk2 > {csv_source_file}.mapping')
181+
182+
print(f'now cut to just the one new-position column')
183+
run(f'cut -f1 {csv_source_file}.mapping > {csv_source_file}.only_mapping')
184+
185+
# this is the actual output vectors, post same shuffle -- this took FOREVER (took 42733.5 sec)
186+
print(f'now shuffle vectors to match')
187+
run(f'python3 -u src/python/shuffle_vec_file.py {vec_source_file} {vec_source_file}.shuffled {DIMENSIONS} {csv_source_file}.only_mapping')
188+
189+
with open(f'{vec_source_file}.shuffled', 'rb') as f:
190+
# sanity check: print first 10 vectors
191+
for i in range(10):
192+
b = f.read(DIMENSIONS * 4)
193+
one_vec = struct.unpack(f'<{DIMENSIONS}f', b)
194+
print(f'vec {i} is length {len(one_vec)}')
195+
sumsq = 0
196+
for i, v in enumerate(one_vec):
197+
print(f' {i:4d}: {v:g}')
198+
sumsq += v*v
199+
print(f' sumsq={sumsq}')
200+
201+
if True:
202+
# split into docs/queries -- files are now shuffled so we can safely take first
203+
# N as queries and remainder as docs and we are pulling from the same well stirred
204+
# chicken soup (hmm, but gravity ... analogy doesn't fully work)
205+
206+
# 1.5 M queries, the balance (TOTAL_DOC_COUNT - 1.5 M) = 39_988_110
207+
query_count = 1_500_000
208+
209+
with open(csv_source_file + '.final', 'r') as meta_in, open(vec_source_file + '.shuffled', 'rb') as vec_in:
210+
211+
meta_csv_in = csv.reader(meta_in)
212+
213+
# first, queries:
214+
copy_meta_and_vec(csv_source_file, vec_source_file, meta_csv_in, vec_in, 'queries', query_count)
215+
216+
# then docs:
217+
copy_meta_and_vec(csv_source_file, vec_source_file, meta_csv_in, vec_in, 'docs', TOTAL_DOC_COUNT - query_count)
218+
219+
if meta_in.tell() != os.path.getsize(csv_source_file):
220+
raise RuntimeError(f'did not consume all metadata file rows? {meta_in.tell()} vs {os.path.getsize(csv_source_file)}')
221+
if vec_in.tell() != os.path.getsize(vec_source_file):
222+
raise RuntimeError(f'did not consume all vector file vectors? {vec_in.tell()} vs {os.path.getsize(vec_source_file)}')
223+
224+
def split_id(id, line_num):
225+
id = row[0]
226+
if not id.startswith(ID_PREFIX):
227+
raise RuntimeError(f'all wiki_id should start with {ID_PREFIX=} but saw {id} at row {line_num}')
228+
tup = id[len(ID_PREFIX):].split('_')
229+
if len(tup) != 2:
230+
raise RuntimeError(f'all wiki_id should start have form wiki-id_paragraph-id but saw {id[len(ID_PREFIX):]} at row {line_num}')
231+
# TODO: should we further valdiate \d+ for each? coalesced correctly ("see once" each wiki_id)
232+
return tup
233+
234+
def copy_meta_and_vec(csv_dest_file, vec_dest_file, meta_csv_in, vec_in, name, doc_copy_count):
235+
236+
vector_size_bytes = DIMENSIONS * 4
237+
238+
print(f'\nnow create subset "{name}" with {doc_copy_count} docs')
239+
240+
# also remove common prefix from id (20231101.en_), and split the remainder into wiki_id and paragraph_id
241+
subset_csv_dest_file = csv_dest_file.replace('.csv', f'.{name}.csv')
242+
subset_vec_dest_file = vec_dest_file.replace('.vec', f'.{name}.vec')
243+
244+
new_headers = ('wiki_id', 'paragraph_id', 'title', 'text', 'url')
245+
246+
with open(subset_csv_dest_file, 'w') as meta_out, open(subset_vec_dest_file, 'wb') as vec_out:
247+
248+
meta_csv_out = csv.writer(meta_out)
249+
meta_csv_out.writerow(new_headers)
250+
251+
for i in range(doc_copy_count):
252+
# id, title, text, url
253+
row = next(meta_csv_in)
254+
255+
wiki_id, paragraph_id = split_id(row[0], meta_csv_in.line_num)
256+
257+
meta_csv_out.writerow([wiki_id, paragraph_id] + row[1:])
258+
vec = vec_in.read(vector_size_bytes)
259+
if len(vec) != vector_size_bytes:
260+
raise RuntimeError(f'failed to read expected {vector_size_bytes=}')
261+
vec_out.write(vec)
262+
263+
print(f'done!\n meta file {subset_csv_dest_file} is {os.path.getsize(subset_csv_dest_file)/1024./1024.} MB\n vec file {subset_vec_dest_file} is {os.path.getsize(subset_vec_dest_file)/1024./1024.} MB')
96264

97265
if __name__ == '__main__':
98266
main()

0 commit comments

Comments
 (0)