1+ import csv
12import os
23import struct
34import subprocess
45import time
6+
57import datasets
6- import csv
7- import struct
88import numpy as np
99
10- csv .field_size_limit (1024 * 1024 * 40 )
10+ csv .field_size_limit (1024 * 1024 * 40 )
1111
1212# TODO
1313# - ugh -- need to fix shuffle to keep pages (all adjacent rows with same wiki_id) together
4444
4545"""
4646
47- ID_PREFIX = ' 20231101.en_'
47+ ID_PREFIX = " 20231101.en_"
4848
4949DIMENSIONS = 1024
5050
5151# total 15_562_116_893 chars in text
5252# total 851_864_089 chars in title
5353
54- TOTAL_DOC_COUNT = 41_488_110 # + 1 header in the initial CSV
54+ TOTAL_DOC_COUNT = 41_488_110 # + 1 header in the initial CSV
5555
5656
5757TOTAL_DOC_COUNT = 5_854_887
5858
59- LANG = 'en'
59+ LANG = "en"
60+
6061
6162def run (command ):
62- print (f' RUN: { command } ' )
63+ print (f" RUN: { command } " )
6364 t0 = time .time ()
6465 subprocess .run (command , shell = True , check = True )
65- print (f' took { time .time () - t0 :.1f} sec' )
66+ print (f" took { time .time () - t0 :.1f} sec" )
6667
67- def main ():
6868
69+ def main ():
6970 # where we will download and write our shuffled vectors, before splitting into queries and docs
70- csv_source_file = ' /b3/cohere-wikipedia-v3.csv'
71- vec_source_file = ' /b3/cohere-wikipedia-v3.vec'
72-
71+ csv_source_file = " /b3/cohere-wikipedia-v3.csv"
72+ vec_source_file = " /b3/cohere-wikipedia-v3.vec"
73+
7374 if False :
7475 docs = datasets .load_dataset ("Cohere/wikipedia-2023-11-embed-multilingual-v3" , LANG , split = "train" , streaming = True )
75- #print(f'columns: {docs.column_names}')
76+ # print(f'columns: {docs.column_names}')
7677
7778 features = docs .features
78- print (f' \n features:' )
79+ print (" \n features:" )
7980 for feature_name , feature_details in features .items ():
80- print (f' { feature_name } : { feature_details } ' )
81- print (' \n ' )
82-
81+ print (f" { feature_name } : { feature_details } " )
82+ print (" \n " )
83+
8384 if False :
8485 for feature_name , feature_details in features .items ():
8586 print (f" Feature Name: { feature_name } " )
8687 print (f" Feature Type: { type (feature_details ).__name__ } " )
87- if hasattr (feature_details , ' dtype' ):
88+ if hasattr (feature_details , " dtype" ):
8889 print (f" Data Type: { feature_details .dtype } " )
8990
9091 # Check for List type (also handles older Sequence type if needed)
9192 if isinstance (feature_details , datasets .Array2D ):
92- print (f"- Column: '{ column_name } ' is a fixed-size Array." )
93+ print (f"- Column: '{ feature_name } ' is a fixed-size Array." )
9394 # The shape attribute will provide the dimensions
9495 print (f" - Shape: { feature_details .shape } , Dtype: { feature_details .dtype } " )
9596
9697 elif isinstance (feature_details , (datasets .List , datasets .Sequence )):
9798 inner_type = feature_details .feature
9899 # Check if the inner type is a Value and get its dtype
99100 if isinstance (inner_type , datasets .Value ):
100- print (f" - Type: List (inner dtype: { inner_type .dtype } )" )
101+ print (f" - Type: List (inner dtype: { inner_type .dtype } )" )
101102 else :
102- # Handle nested structures within the list (e.g., List of dicts)
103- print (f" - Type: List (inner type details: { inner_type } )" )
103+ # Handle nested structures within the list (e.g., List of dicts)
104+ print (f" - Type: List (inner type details: { inner_type } )" )
104105
105106 # Check for simple Value type (like float32, int32, string)
106107 elif isinstance (feature_details , datasets .Value ):
107- if feature_details .dtype in [' float32' , ' float' ]:
108- print (f" - Type: Value (dtype: { feature_details .dtype } )" )
108+ if feature_details .dtype in [" float32" , " float" ]:
109+ print (f" - Type: Value (dtype: { feature_details .dtype } )" )
109110 else :
110- print (f" - Type: Value (dtype: { feature_details .dtype } )" )
111+ print (f" - Type: Value (dtype: { feature_details .dtype } )" )
111112
112113 else :
113114 # Handle other types like ClassLabel, Array, etc.
114115 print (f" - Type: Other (details: { type (feature_details ).__name__ } )" )
115116
116117 # You can add more specific checks for nested features or specific types if needed
117118 print ("-" * 20 )
118-
119+
119120 if False :
120121 row_count = 0
121122 dimensions = 1024
122- headers = ['id' , ' title' , ' text' , ' url' ]
123+ headers = ["id" , " title" , " text" , " url" ]
123124 start_time_sec = time .time ()
124125 # print('%s' % dir(docs['emb']))
125126
@@ -128,75 +129,74 @@ def main():
128129
129130 total_doc_count = 0
130131 cur_wiki_id = None
131-
132- with open (csv_source_file , 'w' ) as meta_out , open (vec_source_file , 'wb' ) as vec_out :
132+
133+ with open (csv_source_file , "w" ) as meta_out , open (vec_source_file , "wb" ) as vec_out :
133134 meta_csv_out = csv .writer (meta_out )
134135 meta_csv_out .writerow (headers )
135136 for doc in docs :
136- meta_csv_out .writerow ([doc [' _id' ], doc [' title' ], doc [' text' ]])
137- total_text_chars += len (doc [' text' ])
138- total_title_chars += len (doc [' title' ])
139- wiki_id , paragraph_id = split_id (doc [' _id' ], row_count )
137+ meta_csv_out .writerow ([doc [" _id" ], doc [" title" ], doc [" text" ]])
138+ total_text_chars += len (doc [" text" ])
139+ total_title_chars += len (doc [" title" ])
140+ wiki_id , paragraph_id = split_id (doc [" _id" ], row_count )
140141 if wiki_id != cur_wiki_id :
141142 total_doc_count += 1
142143 cur_wiki_id = wiki_id
143-
144- emb = np .array (doc [' emb' ], dtype = np .float32 )
145-
144+
145+ emb = np .array (doc [" emb" ], dtype = np .float32 )
146+
146147 if len (emb ) != DIMENSIONS :
147- raise RuntimeError (f' planned on { DIMENSIONS } dims but corpus is { len (emb )} !' )
148+ raise RuntimeError (f" planned on { DIMENSIONS } dims but corpus is { len (emb )} !" )
148149 # print(f'{type(emb)}')
149150 emb .tofile (vec_out )
150151 row_count += 1
151152 if row_count % 10000 == 0 :
152- print (f'{ time .time () - start_time_sec :6.1f} sec: { row_count } ({ total_doc_count } wiki docs)... { vec_out .tell ()} and { meta_out .tell ()} ' )
153+ print (f"{ time .time () - start_time_sec :6.1f} sec: { row_count } ({ total_doc_count } wiki docs)... { vec_out .tell ()} and { meta_out .tell ()} " )
154+
155+ print (f"Done initial download!\n { row_count = } { total_doc_count = } { total_text_chars = } { total_title_chars = } " )
156+ print (f"{ csv_source_file } is { os .path .getsize (csv_source_file ) / 1024 / 1024 / 1024 :.2f} GB" )
157+ print (f"{ vec_source_file } is { os .path .getsize (vec_source_file ) / 1024 / 1024 / 1024 :.2f} GB" )
153158
154- print (f'Done initial download!\n { row_count = } { total_doc_count = } { total_text_chars = } { total_title_chars = } ' )
155- print (f'{ csv_source_file } is { os .path .getsize (csv_source_file )/ 1024 / 1024 / 1024 :.2f} GB' )
156- print (f'{ vec_source_file } is { os .path .getsize (vec_source_file )/ 1024 / 1024 / 1024 :.2f} GB' )
157-
158159 os .chmod (csv_source_file , 0o444 )
159160 os .chmod (vec_source_file , 0o444 )
160-
161- if count != TOTAL_DOC_COUNT :
162- raise RuntimeError (f' expected { TOTAL_DOC_COUNT = } but saw { row_count = } ' )
161+
162+ if row_count != TOTAL_DOC_COUNT :
163+ raise RuntimeError (f" expected { TOTAL_DOC_COUNT = } but saw { row_count = } " )
163164
164165 if False :
166+ print ("strip csv header" )
167+ run (f"sed '1d' { csv_source_file } > { csv_source_file } .noheader" )
165168
166- print (f'strip csv header' )
167- run (f'sed \' 1d\' { csv_source_file } > { csv_source_file } .noheader' )
168-
169- print (f'now insert line numbers' )
170- run (f'nl -v 0 -ba { csv_source_file } .noheader > { csv_source_file } .num' )
169+ print ("now insert line numbers" )
170+ run (f"nl -v 0 -ba { csv_source_file } .noheader > { csv_source_file } .num" )
171171
172- print (f' now shuffle' )
173- run (f' shuf { csv_source_file } .num > { csv_source_file } .num.shuf' )
172+ print (" now shuffle" )
173+ run (f" shuf { csv_source_file } .num > { csv_source_file } .num.shuf" )
174174
175175 # this is the actual output meta CSV, post shuffle
176- print (f' now remove line numbers' )
177- run (f' cut -f 2- { csv_source_file } .num.shuf > { csv_source_file } .final' )
176+ print (" now remove line numbers" )
177+ run (f" cut -f 2- { csv_source_file } .num.shuf > { csv_source_file } .final" )
178178
179- print (f' now sort to get reverse mapping' )
180- run (f' nl -v 0 -ba { csv_source_file } .num.shuf | sort -nk2 > { csv_source_file } .mapping' )
179+ print (" now sort to get reverse mapping" )
180+ run (f" nl -v 0 -ba { csv_source_file } .num.shuf | sort -nk2 > { csv_source_file } .mapping" )
181181
182- print (f' now cut to just the one new-position column' )
183- run (f' cut -f1 { csv_source_file } .mapping > { csv_source_file } .only_mapping' )
182+ print (" now cut to just the one new-position column" )
183+ run (f" cut -f1 { csv_source_file } .mapping > { csv_source_file } .only_mapping" )
184184
185185 # this is the actual output vectors, post same shuffle -- this took FOREVER (took 42733.5 sec)
186- print (f' now shuffle vectors to match' )
187- run (f' python3 -u src/python/shuffle_vec_file.py { vec_source_file } { vec_source_file } .shuffled { DIMENSIONS } { csv_source_file } .only_mapping' )
186+ print (" now shuffle vectors to match" )
187+ run (f" python3 -u src/python/shuffle_vec_file.py { vec_source_file } { vec_source_file } .shuffled { DIMENSIONS } { csv_source_file } .only_mapping" )
188188
189- with open (f' { vec_source_file } .shuffled' , 'rb' ) as f :
189+ with open (f" { vec_source_file } .shuffled" , "rb" ) as f :
190190 # sanity check: print first 10 vectors
191191 for i in range (10 ):
192192 b = f .read (DIMENSIONS * 4 )
193- one_vec = struct .unpack (f' <{ DIMENSIONS } f' , b )
194- print (f' vec { i } is length { len (one_vec )} ' )
193+ one_vec = struct .unpack (f" <{ DIMENSIONS } f" , b )
194+ print (f" vec { i } is length { len (one_vec )} " )
195195 sumsq = 0
196196 for i , v in enumerate (one_vec ):
197- print (f' { i :4d} : { v :g} ' )
198- sumsq += v * v
199- print (f' sumsq={ sumsq } ' )
197+ print (f" { i :4d} : { v :g} " )
198+ sumsq += v * v
199+ print (f" sumsq={ sumsq } " )
200200
201201 if True :
202202 # split into docs/queries -- files are now shuffled so we can safely take first
@@ -206,45 +206,43 @@ def main():
206206 # 1.5 M queries, the balance (TOTAL_DOC_COUNT - 1.5 M) = 39_988_110
207207 query_count = 1_500_000
208208
209- with open (csv_source_file + '.final' , 'r' ) as meta_in , open (vec_source_file + '.shuffled' , 'rb' ) as vec_in :
210-
209+ with open (csv_source_file + ".final" ) as meta_in , open (vec_source_file + ".shuffled" , "rb" ) as vec_in :
211210 meta_csv_in = csv .reader (meta_in )
212211
213212 # first, queries:
214- copy_meta_and_vec (csv_source_file , vec_source_file , meta_csv_in , vec_in , ' queries' , query_count )
213+ copy_meta_and_vec (csv_source_file , vec_source_file , meta_csv_in , vec_in , " queries" , query_count )
215214
216215 # then docs:
217- copy_meta_and_vec (csv_source_file , vec_source_file , meta_csv_in , vec_in , ' docs' , TOTAL_DOC_COUNT - query_count )
216+ copy_meta_and_vec (csv_source_file , vec_source_file , meta_csv_in , vec_in , " docs" , TOTAL_DOC_COUNT - query_count )
218217
219218 if meta_in .tell () != os .path .getsize (csv_source_file ):
220- raise RuntimeError (f' did not consume all metadata file rows? { meta_in .tell ()} vs { os .path .getsize (csv_source_file )} ' )
219+ raise RuntimeError (f" did not consume all metadata file rows? { meta_in .tell ()} vs { os .path .getsize (csv_source_file )} " )
221220 if vec_in .tell () != os .path .getsize (vec_source_file ):
222- raise RuntimeError (f'did not consume all vector file vectors? { vec_in .tell ()} vs { os .path .getsize (vec_source_file )} ' )
221+ raise RuntimeError (f"did not consume all vector file vectors? { vec_in .tell ()} vs { os .path .getsize (vec_source_file )} " )
222+
223223
224224def split_id (id , line_num ):
225- id = row [0 ]
226225 if not id .startswith (ID_PREFIX ):
227- raise RuntimeError (f' all wiki_id should start with { ID_PREFIX = } but saw { id } at row { line_num } ' )
228- tup = id [len (ID_PREFIX ):].split ('_' )
226+ raise RuntimeError (f" all wiki_id should start with { ID_PREFIX = } but saw { id } at row { line_num } " )
227+ tup = id [len (ID_PREFIX ) :].split ("_" )
229228 if len (tup ) != 2 :
230- raise RuntimeError (f' all wiki_id should start have form wiki-id_paragraph-id but saw { id [len (ID_PREFIX ):]} at row { line_num } ' )
229+ raise RuntimeError (f" all wiki_id should start have form wiki-id_paragraph-id but saw { id [len (ID_PREFIX ) :]} at row { line_num } " )
231230 # TODO: should we further valdiate \d+ for each? coalesced correctly ("see once" each wiki_id)
232231 return tup
233-
234- def copy_meta_and_vec (csv_dest_file , vec_dest_file , meta_csv_in , vec_in , name , doc_copy_count ):
235232
233+
234+ def copy_meta_and_vec (csv_dest_file , vec_dest_file , meta_csv_in , vec_in , name , doc_copy_count ):
236235 vector_size_bytes = DIMENSIONS * 4
237-
236+
238237 print (f'\n now create subset "{ name } " with { doc_copy_count } docs' )
239-
240- # also remove common prefix from id (20231101.en_), and split the remainder into wiki_id and paragraph_id
241- subset_csv_dest_file = csv_dest_file .replace ('.csv' , f'.{ name } .csv' )
242- subset_vec_dest_file = vec_dest_file .replace ('.vec' , f'.{ name } .vec' )
243238
244- new_headers = ('wiki_id' , 'paragraph_id' , 'title' , 'text' , 'url' )
239+ # also remove common prefix from id (20231101.en_), and split the remainder into wiki_id and paragraph_id
240+ subset_csv_dest_file = csv_dest_file .replace (".csv" , f".{ name } .csv" )
241+ subset_vec_dest_file = vec_dest_file .replace (".vec" , f".{ name } .vec" )
245242
246- with open ( subset_csv_dest_file , 'w' ) as meta_out , open ( subset_vec_dest_file , 'wb' ) as vec_out :
243+ new_headers = ( "wiki_id" , "paragraph_id" , "title" , "text" , "url" )
247244
245+ with open (subset_csv_dest_file , "w" ) as meta_out , open (subset_vec_dest_file , "wb" ) as vec_out :
248246 meta_csv_out = csv .writer (meta_out )
249247 meta_csv_out .writerow (new_headers )
250248
@@ -257,10 +255,13 @@ def copy_meta_and_vec(csv_dest_file, vec_dest_file, meta_csv_in, vec_in, name, d
257255 meta_csv_out .writerow ([wiki_id , paragraph_id ] + row [1 :])
258256 vec = vec_in .read (vector_size_bytes )
259257 if len (vec ) != vector_size_bytes :
260- raise RuntimeError (f' failed to read expected { vector_size_bytes = } ' )
258+ raise RuntimeError (f" failed to read expected { vector_size_bytes = } " )
261259 vec_out .write (vec )
262260
263- print (f'done!\n meta file { subset_csv_dest_file } is { os .path .getsize (subset_csv_dest_file )/ 1024. / 1024. } MB\n vec file { subset_vec_dest_file } is { os .path .getsize (subset_vec_dest_file )/ 1024. / 1024. } MB' )
261+ print (
262+ f"done!\n meta file { subset_csv_dest_file } is { os .path .getsize (subset_csv_dest_file ) / 1024.0 / 1024.0 } MB\n vec file { subset_vec_dest_file } is { os .path .getsize (subset_vec_dest_file ) / 1024.0 / 1024.0 } MB"
263+ )
264+
264265
265- if __name__ == ' __main__' :
266+ if __name__ == " __main__" :
266267 main ()
0 commit comments