88from pathlib import Path
99from typing import TYPE_CHECKING
1010
11+ import torch
1112from huggingface_hub import snapshot_download
1213from transformers import AutoModelForMaskedLM , AutoTokenizer
1314
2627class OSNeuralSparseDocV3GTE (BaseEmbeddingModel ):
2728 """OpenSearch Neural Sparse Encoding Doc v3 GTE model.
2829
30+ This model generates sparse embeddings for documents by using a masked language
31+ model's logits to identify the most relevant tokens.
32+
2933 HuggingFace URI: opensearch-project/opensearch-neural-sparse-encoding-doc-v3-gte
3034 """
3135
@@ -40,8 +44,8 @@ def __init__(self, model_path: str | Path) -> None:
4044 super ().__init__ (model_path )
4145 self ._model : PreTrainedModel | None = None
4246 self ._tokenizer : DistilBertTokenizerFast | None = None
43- self ._special_token_ids : list | None = None
44- self ._id_to_token : list | None = None
47+ self ._special_token_ids : list [ int ] | None = None
48+ self ._device : torch . device = torch . device ( "cpu" )
4549
4650 def download (self ) -> Path :
4751 """Download and prepare model, saving to self.model_path.
@@ -139,29 +143,205 @@ def load(self) -> None:
139143 if not self .model_path .exists ():
140144 raise FileNotFoundError (f"Model not found at path: { self .model_path } " )
141145
142- # load local model and tokenizer
143- self ._model = AutoModelForMaskedLM .from_pretrained (
146+ # setup device (use CUDA if available, otherwise CPU)
147+ self ._device = torch .device ("cuda" if torch .cuda .is_available () else "cpu" )
148+
149+ # load tokenizer
150+ self ._tokenizer = AutoTokenizer .from_pretrained ( # type: ignore[no-untyped-call]
144151 self .model_path ,
145- trust_remote_code = True ,
146152 local_files_only = True ,
147153 )
148- self ._tokenizer = AutoTokenizer .from_pretrained ( # type: ignore[no-untyped-call]
154+
155+ # load model as AutoModelForMaskedLM (required for sparse embeddings)
156+ self ._model = AutoModelForMaskedLM .from_pretrained (
149157 self .model_path ,
158+ trust_remote_code = True ,
150159 local_files_only = True ,
151160 )
161+ self ._model .to (self ._device ) # type: ignore[arg-type]
162+ self ._model .eval ()
152163
153- # setup special tokens
164+ # set special token IDs (following model card pattern)
165+ # these will be zeroed out in the sparse vectors
154166 self ._special_token_ids = [
155- self ._tokenizer .vocab [str ( token ) ]
167+ self ._tokenizer .vocab [token ] # type: ignore[index ]
156168 for token in self ._tokenizer .special_tokens_map .values ()
157169 ]
158170
159- # setup id_to_token mapping
160- self ._id_to_token = ["" for _ in range (self ._tokenizer .vocab_size )]
161- for token , token_id in self ._tokenizer .vocab .items ():
162- self ._id_to_token [token_id ] = token
171+ logger .info (
172+ f"Model loaded successfully on { self ._device } , "
173+ f"{ time .perf_counter () - start_time :.2f} s"
174+ )
175+
176+ def create_embedding (self , embedding_input : EmbeddingInput ) -> Embedding :
177+ """Create sparse vector and decoded token weight embeddings for an input text.
178+
179+ Args:
180+ embedding_input: EmbeddingInput object with a .text attribute
181+ """
182+ # generate the sparse embeddings
183+ sparse_vector , decoded_tokens = self ._encode_documents ([embedding_input .text ])[0 ]
184+
185+ # coerce sparse vector tensor into list[float]
186+ sparse_vector_list = sparse_vector .cpu ().numpy ().tolist ()
187+
188+ return Embedding (
189+ timdex_record_id = embedding_input .timdex_record_id ,
190+ run_id = embedding_input .run_id ,
191+ run_record_offset = embedding_input .run_record_offset ,
192+ model_uri = self .model_uri ,
193+ embedding_strategy = embedding_input .embedding_strategy ,
194+ embedding_vector = sparse_vector_list ,
195+ embedding_token_weights = decoded_tokens ,
196+ )
197+
198+ def _encode_documents (
199+ self ,
200+ texts : list [str ],
201+ ) -> list [tuple [torch .Tensor , dict [str , float ]]]:
202+ """Encode documents into sparse vectors and decoded token weights.
203+
204+ This follows the pattern outlined on the HuggingFace model card for document
205+ encoding.
206+
207+ This method will accommodate MULTIPLE text inputs, and return a list of
208+ embeddings, but the calling context of create_embedding() is a SINGULAR input +
209+ output. This method keeps the ability to handle multiple inputs + outputs, in the
210+ event we want something like a create_multiple_embeddings() method in the future,
211+ but only returns a single result.
212+
213+ At a very high level, the following is performed:
214+
215+ 1. We tokenize the input text into "features" using the model's tokenizer.
216+
217+ 2. The features are fed to the model returning model output logits. These logits
218+ are "dense" in the sense there are few zeros, but they are not "dense vectors"
219+ (embeddings) in the sense that they meaningfully represent the input document in
220+ geometric space; two logit tensors cannot be compared with something like cosine
221+ similarity.
222+
223+ 3. The logits are then converted into a sparse vector, which is a numeric
224+ array of floats with the same number of values as the model's vocabulary. Each
225+ value's position in the sparse array corresponds to the token id in the
226+ vocabulary, and the value itself is the "weight" of this token in the input text.
227+
228+ 4. Lastly, we convert this sparse vector into a {token:weight} dictionary of the
229+ actual token strings and their numerical weight. This dictionary may contain
230+ tokens not present in the original text, but will be considerably shorter than
231+ the model vocabulary length given all zero and low scoring tokens are dropped.
232+ This is the final form that we will ultimately index into OpenSearch.
233+
234+ Args:
235+ texts: list of strings to create embeddings for
236+ """
237+ if self ._model is None or self ._tokenizer is None :
238+ raise RuntimeError ("Model not loaded. Call load() before create_embedding." )
239+
240+ # tokenize the input texts
241+ features = self ._tokenizer (
242+ texts ,
243+ padding = True ,
244+ truncation = True ,
245+ return_tensors = "pt" , # returns PyTorch tensors instead of Python lists
246+ return_token_type_ids = False ,
247+ )
248+
249+ # move to CPU or GPU device, depending on what's available
250+ features = {k : v .to (self ._device ) for k , v in features .items ()}
251+
252+ # pass features to the model and receive model output logits as a tensor
253+ with torch .no_grad ():
254+ output = self ._model (** features )[0 ]
255+
256+ # generate sparse vectors from model logits tensor
257+ sparse_vectors = self ._get_sparse_vectors (features , output )
258+
259+ # decode sparse vectors to token-weight dictionaries
260+ decoded = self ._decode_sparse_vectors (sparse_vectors )
261+
262+ # return list of tuple(vector, decoded token weights) embedding results
263+ return [(sparse_vectors [i ], decoded [i ]) for i in range (len (texts ))]
264+
265+ def _get_sparse_vectors (
266+ self , features : dict [str , torch .Tensor ], output : torch .Tensor
267+ ) -> torch .Tensor :
268+ """Convert model logits output to sparse vectors.
269+
270+ This follows the HuggingFace model card exactly: https://huggingface.co/
271+ opensearch-project/opensearch-neural-sparse-encoding-doc-v3-gte#usage-huggingface
272+
273+ This implements the get_sparse_vector function from the model card:
274+ 1. Max pooling with attention mask
275+ 2. log(1 + log(1 + relu())) transformation
276+ 3. Zero out special tokens
277+
278+ The end result is a sparse vector with a length of the model vocabulary, with each
279+ position representing a token in the model vocabulary and each value representing
280+ that token's weight relative to the input text.
281+
282+ Args:
283+ features: Tokenizer output with attention_mask
284+ output: Model logits of shape (batch_size, seq_len, vocab_size)
285+
286+ Returns:
287+ Sparse vectors of shape (batch_size, vocab_size)
288+ """
289+ # collapse sequence positions: take max logit for each vocab token across all
290+ # positions (also masks out padding tokens)
291+ values , _ = torch .max (output * features ["attention_mask" ].unsqueeze (- 1 ), dim = 1 )
292+
293+ # compress values to create sparsity: ReLU removes negatives,
294+ # double-log shrinks large values
295+ values = torch .log (1 + torch .log (1 + torch .relu (values )))
296+
297+ # remove special tokens like [CLS], [SEP], [PAD]
298+ values [:, self ._special_token_ids ] = 0
299+
300+ return values
301+
302+ def _decode_sparse_vectors (
303+ self , sparse_vectors : torch .Tensor
304+ ) -> list [dict [str , float ]]:
305+ """Convert sparse vectors to token-weight dictionaries.
306+
307+ Handles both single vectors and batches, returning a list of dictionaries mapping
308+ token strings to their weights.
309+
310+ Args:
311+ sparse_vectors: Tensor of shape (batch_size, vocab_size) or (vocab_size,)
312+
313+ Returns:
314+ List of dictionaries with token-weight pairs
315+ """
316+ if sparse_vectors .dim () == 1 :
317+ sparse_vectors = sparse_vectors .unsqueeze (0 )
318+
319+ # move to CPU for processing
320+ sparse_vectors_cpu = sparse_vectors .cpu ()
321+
322+ results : list [dict ] = []
323+ for vector in sparse_vectors_cpu :
324+
325+ # find non-zero indices and values
326+ nonzero_indices = torch .nonzero (vector , as_tuple = False ).squeeze (- 1 )
327+
328+ if nonzero_indices .numel () == 0 :
329+ results .append ({})
330+ continue
331+
332+ # get weights
333+ weights = vector [nonzero_indices ].tolist ()
334+
335+ # convert indices to token strings
336+ token_ids = nonzero_indices .tolist ()
337+ tokens = self ._tokenizer .convert_ids_to_tokens (token_ids ) # type: ignore[union-attr]
163338
164- logger .info (f"Model loaded successfully, { time .perf_counter ()- start_time } s" )
339+ # create token:weight dictionary
340+ token_dict = {
341+ token : weight
342+ for token , weight in zip (tokens , weights , strict = True )
343+ if token is not None
344+ }
345+ results .append (token_dict )
165346
166- def create_embedding (self , input_record : EmbeddingInput ) -> Embedding :
167- raise NotImplementedError
347+ return results
0 commit comments