From 23b93f8fa13e8ec0585cf27c517a761e08d98ff0 Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Fri, 7 Nov 2025 17:58:01 +0800 Subject: [PATCH 1/4] add dedup strategy between pref and textual --- src/memos/api/routers/server_router.py | 66 ++++++++++----- src/memos/mem_cube/navie.py | 60 +++---------- src/memos/memories/textual/item.py | 1 + .../textual/prefer_text_memory/adder.py | 84 ++++++++++++++++++- .../textual/prefer_text_memory/factory.py | 4 +- src/memos/templates/prefer_complete_prompt.py | 68 +++++++++++++++ 6 files changed, 210 insertions(+), 73 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 8df383bfb..12865fd65 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -55,6 +55,8 @@ ExtractorFactory, RetrieverFactory, ) +from memos.memories.textual.simple_preference import SimplePreferenceTextMemory +from memos.memories.textual.simple_tree import SimpleTreeTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import ( InternetRetrieverFactory, @@ -195,18 +197,43 @@ def init_server(): internet_retriever = InternetRetrieverFactory.from_config( internet_retriever_config, embedder=embedder ) + + # Initialize memory manager + memory_manager = MemoryManager( + graph_db, + embedder, + llm, + memory_size=_get_default_memory_size(default_cube_config), + is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), + ) + + # Initialize text memory + text_mem = SimpleTreeTextMemory( + llm=llm, + embedder=embedder, + mem_reader=mem_reader, + graph_db=graph_db, + reranker=reranker, + memory_manager=memory_manager, + config=default_cube_config.text_mem.config, + internet_retriever=internet_retriever, + ) + pref_extractor = ExtractorFactory.from_config( config_factory=pref_extractor_config, llm_provider=llm, embedder=embedder, vector_db=vector_db, ) + pref_adder = AdderFactory.from_config( config_factory=pref_adder_config, llm_provider=llm, embedder=embedder, vector_db=vector_db, + text_mem=text_mem, ) + pref_retriever = RetrieverFactory.from_config( config_factory=pref_retriever_config, llm_provider=llm, @@ -215,33 +242,29 @@ def init_server(): vector_db=vector_db, ) - # Initialize memory manager - memory_manager = MemoryManager( - graph_db, - embedder, - llm, - memory_size=_get_default_memory_size(default_cube_config), - is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), + # Initialize preference memory + pref_mem = SimplePreferenceTextMemory( + extractor_llm=llm, + vector_db=vector_db, + embedder=embedder, + reranker=reranker, + extractor=pref_extractor, + adder=pref_adder, + retriever=pref_retriever, ) + mos_server = MOSServer( mem_reader=mem_reader, llm=llm, online_bot=False, ) + # Create MemCube with pre-initialized memory instances naive_mem_cube = NaiveMemCube( - llm=llm, - embedder=embedder, - mem_reader=mem_reader, - graph_db=graph_db, - reranker=reranker, - internet_retriever=internet_retriever, - memory_manager=memory_manager, - default_cube_config=default_cube_config, - vector_db=vector_db, - pref_extractor=pref_extractor, - pref_adder=pref_adder, - pref_retriever=pref_retriever, + text_mem=text_mem, + pref_mem=pref_mem, + act_mem=None, + para_mem=None, ) # Initialize Scheduler @@ -279,6 +302,8 @@ def init_server(): pref_extractor, pref_adder, pref_retriever, + text_mem, + pref_mem, ) @@ -300,6 +325,8 @@ def init_server(): pref_extractor, pref_adder, pref_retriever, + text_mem, + pref_mem, ) = init_server() @@ -601,6 +628,7 @@ def _process_pref_mem() -> list[dict[str, str]]: info={ "user_id": add_req.user_id, "session_id": target_session_id, + "mem_cube_id": add_req.mem_cube_id, }, ) pref_ids_local: list[str] = naive_mem_cube.pref_mem.add(pref_memories_local) diff --git a/src/memos/mem_cube/navie.py b/src/memos/mem_cube/navie.py index ba9f136b7..ecfd949b7 100644 --- a/src/memos/mem_cube/navie.py +++ b/src/memos/mem_cube/navie.py @@ -2,26 +2,13 @@ from typing import Literal -from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.utils import get_json_file_model_schema -from memos.embedders.base import BaseEmbedder from memos.exceptions import ConfigurationError, MemCubeError -from memos.graph_dbs.base import BaseGraphDB -from memos.llms.base import BaseLLM from memos.log import get_logger from memos.mem_cube.base import BaseMemCube -from memos.mem_reader.base import BaseMemReader from memos.memories.activation.base import BaseActMemory from memos.memories.parametric.base import BaseParaMemory from memos.memories.textual.base import BaseTextMemory -from memos.memories.textual.prefer_text_memory.adder import BaseAdder -from memos.memories.textual.prefer_text_memory.extractor import BaseExtractor -from memos.memories.textual.prefer_text_memory.retrievers import BaseRetriever -from memos.memories.textual.simple_preference import SimplePreferenceTextMemory -from memos.memories.textual.simple_tree import SimpleTreeTextMemory -from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager -from memos.reranker.base import BaseReranker -from memos.vec_dbs.base import BaseVecDB logger = get_logger(__name__) @@ -32,51 +19,26 @@ class NaiveMemCube(BaseMemCube): def __init__( self, - llm: BaseLLM, - embedder: BaseEmbedder, - mem_reader: BaseMemReader, - graph_db: BaseGraphDB, - reranker: BaseReranker, - memory_manager: MemoryManager, - default_cube_config: GeneralMemCubeConfig, - vector_db: BaseVecDB, - internet_retriever: None = None, - pref_extractor: BaseExtractor | None = None, - pref_adder: BaseAdder | None = None, - pref_retriever: BaseRetriever | None = None, + text_mem: BaseTextMemory | None = None, + pref_mem: BaseTextMemory | None = None, + act_mem: BaseActMemory | None = None, + para_mem: BaseParaMemory | None = None, ): - """Initialize the MemCube with a configuration.""" - self._text_mem: BaseTextMemory | None = SimpleTreeTextMemory( - llm, - embedder, - mem_reader, - graph_db, - reranker, - memory_manager, - default_cube_config.text_mem.config, - internet_retriever, - ) - self._act_mem: BaseActMemory | None = None - self._para_mem: BaseParaMemory | None = None - self._pref_mem: BaseTextMemory | None = SimplePreferenceTextMemory( - extractor_llm=llm, - vector_db=vector_db, - embedder=embedder, - reranker=reranker, - extractor=pref_extractor, - adder=pref_adder, - retriever=pref_retriever, - ) + """Initialize the MemCube with memory instances.""" + self._text_mem: BaseTextMemory = text_mem + self._act_mem: BaseActMemory | None = act_mem + self._para_mem: BaseParaMemory | None = para_mem + self._pref_mem: BaseTextMemory | None = pref_mem def load( - self, dir: str, memory_types: list[Literal["text_mem", "act_mem", "para_mem"]] | None = None + self, dir: str, memory_types: list[Literal["text_mem", "act_mem", "para_mem", "pref_mem"]] | None = None ) -> None: """Load memories. Args: dir (str): The directory containing the memory files. memory_types (list[str], optional): List of memory types to load. If None, loads all available memory types. - Options: ["text_mem", "act_mem", "para_mem"] + Options: ["text_mem", "act_mem", "para_mem", "pref_mem"] """ loaded_schema = get_json_file_model_schema(os.path.join(dir, self.config.config_filename)) if loaded_schema != self.config.model_schema: diff --git a/src/memos/memories/textual/item.py b/src/memos/memories/textual/item.py index 2c23ae193..e7595443d 100644 --- a/src/memos/memories/textual/item.py +++ b/src/memos/memories/textual/item.py @@ -198,6 +198,7 @@ class PreferenceTextualMemoryMetadata(TextualMemoryMetadata): embedding: list[float] | None = Field(default=None, description="Vector of the dialog.") preference: str | None = Field(default=None, description="Preference.") created_at: str | None = Field(default=None, description="Timestamp of the dialog.") + mem_cube_id: str | None = Field(default=None, description="ID of the MemCube.") class TextualMemoryItem(BaseModel): diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index a78601e86..3cd3c1bd2 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -13,6 +13,7 @@ NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT, NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT_FINE, NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT_OP_TRACE, + NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT, ) from memos.vec_dbs.item import MilvusVecDBItem @@ -24,7 +25,7 @@ class BaseAdder(ABC): """Abstract base class for adders.""" @abstractmethod - def __init__(self, llm_provider=None, embedder=None, vector_db=None): + def __init__(self, llm_provider=None, embedder=None, vector_db=None, text_mem=None): """Initialize the adder.""" @abstractmethod @@ -41,12 +42,13 @@ def add(self, memories: list[TextualMemoryItem | dict[str, Any]], *args, **kwarg class NaiveAdder(BaseAdder): """Naive adder.""" - def __init__(self, llm_provider=None, embedder=None, vector_db=None): + def __init__(self, llm_provider=None, embedder=None, vector_db=None, text_mem=None): """Initialize the naive adder.""" - super().__init__(llm_provider, embedder, vector_db) + super().__init__(llm_provider, embedder, vector_db, text_mem) self.llm_provider = llm_provider self.embedder = embedder self.vector_db = vector_db + self.text_mem = text_mem def _judge_update_or_add_fast(self, old_msg: str, new_msg: str) -> bool: """Judge if the new message expresses the same core content as the old message.""" @@ -81,6 +83,47 @@ def _judge_update_or_add_fine(self, new_mem: str, retrieved_mems: str) -> dict[s logger.error(f"Error in judge_update_or_add_fine: {e}") return None + def _judge_dup_with_text_mem(self, new_pref: MilvusVecDBItem) -> bool: + """Judge if the new message is the same as the text memory for a single preference.""" + if new_pref.payload["preference_type"] != "explicit_preference": + return False + text_recalls = self.text_mem.search( + query=new_pref.memory, + top_k=5, + info={ + "user_id": new_pref.payload["user_id"], + "session_id": new_pref.payload["session_id"], + }, + mode="fast", + search_filter={"session_id": new_pref.payload["session_id"]}, + user_name=new_pref.payload["mem_cube_id"], + ) + + text_mem_recalls = [ + {"id": text_recall.id, "memory": text_recall.memory} + for text_recall in text_recalls + ] + + if not text_mem_recalls: + return False + + new_preference = {"id": new_pref.id, "memory": new_pref.payload["preference"]} + + prompt = NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT.replace( + "{new_preference}", json.dumps(new_preference) + ).replace( + "{retrieved_memories}", json.dumps(text_mem_recalls) + ) + try: + response = self.llm_provider.generate([{"role": "user", "content": prompt}]) + response = response.strip().replace("```json", "").replace("```", "").strip() + result = json.loads(response) + exists = result.get("exists", False) + return exists + except Exception as e: + logger.error(f"Error in judge_dup_with_text_mem: {e}") + return False + def _judge_update_or_add_trace_op( self, new_mems: str, retrieved_mems: str ) -> dict[str, Any] | None: @@ -98,6 +141,30 @@ def _judge_update_or_add_trace_op( logger.error(f"Error in judge_update_or_add_trace_op: {e}") return None + def _dedup_explicit_pref_by_textual(self, new_prefs: list[MilvusVecDBItem]) -> list[MilvusVecDBItem]: + """Deduplicate explicit preferences by textual memory.""" + if os.getenv("PREF_DEDUP_EXP_BY_TEXTUAL", "false").lower() != "true": + return new_prefs + dedup_prefs = [] + with ContextThreadPoolExecutor(max_workers=max(1, min(len(new_prefs), 5))) as executor: + future_to_idx = { + executor.submit(self._judge_dup_with_text_mem, new_pref): idx + for idx, new_pref in enumerate(new_prefs) + } + is_dup_flags = [False] * len(new_prefs) + for future in as_completed(future_to_idx): + idx = future_to_idx[future] + try: + is_dup_flags[idx] = future.result() + except Exception as e: + logger.error( + f"Error in _judge_dup_with_text_mem for pref {new_prefs[idx].id}: {e}" + ) + is_dup_flags[idx] = False + + dedup_prefs = [pref for idx, pref in enumerate(new_prefs) if not is_dup_flags[idx]] + return dedup_prefs + def _update_memory_op_trace( self, new_memories: list[TextualMemoryItem], @@ -143,6 +210,11 @@ def _update_memory_op_trace( retrieved_mems=json.dumps(retrieved_mem_inputs) if retrieved_mem_inputs else "", ) if not rsp: + dedup_rsp = self._dedup_explicit_pref_by_textual(new_vec_db_items) + if not dedup_rsp: + return [] + else: + new_vec_db_items = dedup_rsp with ContextThreadPoolExecutor(max_workers=min(len(new_vec_db_items), 5)) as executor: futures = { executor.submit(self.vector_db.add, collection_name, [db_item]): db_item @@ -245,6 +317,9 @@ def _update_memory_fine( self.vector_db.update(collection_name, rsp["id"], update_vec_db_item) return rsp["id"] else: + dedup_rsp = self._dedup_explicit_pref_by_textual([vec_db_item]) + if not dedup_rsp: + return "" self.vector_db.add(collection_name, [vec_db_item]) return vec_db_item.id @@ -272,6 +347,9 @@ def _update_memory_fast( old_msg_str = recall.memory new_msg_str = new_memory.memory is_same = self._judge_update_or_add_fast(old_msg=old_msg_str, new_msg=new_msg_str) + dedup_rsp = self._dedup_explicit_pref_by_textual([vec_db_item]) + if not dedup_rsp: + return "" if is_same: vec_db_item.id = recall.id self.vector_db.update(collection_name, recall.id, vec_db_item) diff --git a/src/memos/memories/textual/prefer_text_memory/factory.py b/src/memos/memories/textual/prefer_text_memory/factory.py index 22182261a..2dc953b49 100644 --- a/src/memos/memories/textual/prefer_text_memory/factory.py +++ b/src/memos/memories/textual/prefer_text_memory/factory.py @@ -19,14 +19,14 @@ class AdderFactory(BaseAdder): @classmethod def from_config( - cls, config_factory: AdderConfigFactory, llm_provider=None, embedder=None, vector_db=None + cls, config_factory: AdderConfigFactory, llm_provider=None, embedder=None, vector_db=None, text_mem=None ) -> BaseAdder: """Create a Adder instance from a configuration factory.""" backend = config_factory.backend if backend not in cls.backend_to_class: raise ValueError(f"Invalid backend: {backend}") adder_class = cls.backend_to_class[backend] - return adder_class(llm_provider=llm_provider, embedder=embedder, vector_db=vector_db) + return adder_class(llm_provider=llm_provider, embedder=embedder, vector_db=vector_db, text_mem=text_mem) class ExtractorFactory(BaseExtractor): diff --git a/src/memos/templates/prefer_complete_prompt.py b/src/memos/templates/prefer_complete_prompt.py index 9e0274cba..3a468b943 100644 --- a/src/memos/templates/prefer_complete_prompt.py +++ b/src/memos/templates/prefer_complete_prompt.py @@ -132,6 +132,74 @@ """ +NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT = """ +You are a content comparison expert. Your task is to determine whether each new preference information already exists in the retrieved text memories. + +**Task:** For each new preference, check if its content/topic/intent is already present in any of the retrieved text memories. + +**Input Structure:** +- New preferences: Array of objects, each with "id" and "memory" fields +- Retrieved memories: Array of objects, each with "id" and "memory" fields + +**Judgment Criteria:** +- If the core content, topic, or intent of a new preference is **already covered** in any retrieved memory, mark as "exists" (true). +- Consider both semantic similarity and topic overlap - even if wording differs, if the meaning is the same, it counts as existing. +- If the new preference introduces **new information, different topic, or unique content** not found in retrieved memories, mark as "exists" (false). +- Focus on the substantive content rather than minor phrasing differences. + +**Output Format (JSON):** +```json +{ + "new_preference_id": "ID of the new preference being evaluated", + "exists": true/false, + "reasoning": "Brief explanation of your judgment, citing which retrieved memory contains similar content (if exists=true) or why it's new content (if exists=false)", + "matched_memory_id": "If exists=true, indicate which retrieved memory id matches; otherwise null" +} +``` +**New Preferences (array):** +{new_preference} + +**Retrieved Text Memories (array):** +{retrieved_memories} + +Output only the JSON response, no additional text. +""" + + +NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT_ZH = """ +你是一个内容比较专家。你的任务是判断每个新的偏好信息是否已经存在于召回的文本记忆中。 + +**任务:** 对每个新偏好,检查其内容/主题/意图是否已经在任何召回的文本记忆中存在。 + +**输入结构:** +- 新偏好:对象数组,每个对象包含"id"和"memory"字段 +- 召回记忆:对象数组,每个对象包含"id"和"memory"字段 + +**判断标准:** +- 如果新偏好的核心内容、主题或意图**已经被覆盖**在任何召回的记忆中,标记为"exists"(true)。 +- 考虑语义相似性和主题重叠 - 即使措辞不同,如果含义相同,也算作已存在。 +- 如果新偏好引入了**新信息、不同主题或独特内容**,且在召回记忆中未找到,标记为"exists"(false)。 +- 关注实质性内容,而非细微的表达差异。 + +**输出格式(JSON):** +```json +{ + "new_preference_id": "正在评估的新偏好ID", + "exists": true/false, + "reasoning": "简要说明你的判断理由,引用包含相似内容的召回记忆(如果exists=true)或说明为什么是新内容(如果exists=false)", + "matched_memory_id": "如果exists=true,指出匹配的召回记忆id;否则为null" +} +``` +**新偏好(数组):** +{new_preference} + +**召回的文本记忆(数组):** +{retrieved_memories} + +只输出JSON响应,不要输出其他任何文本。 +""" + + NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT = """ You are a content comparison expert. Now you are given old and new information, each containing a question, answer topic name and topic description. Please judge whether these two information express the **same question or core content**, regardless of expression differences, details or example differences. The judgment criteria are as follows: From c13c0eea517c2b2f8afc470b501ad3d551d3a864 Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Fri, 7 Nov 2025 18:03:00 +0800 Subject: [PATCH 2/4] make precommit --- evaluation/.env-example | 1 - src/memos/mem_cube/navie.py | 4 +++- .../memories/textual/prefer_text_memory/adder.py | 13 ++++++------- .../memories/textual/prefer_text_memory/factory.py | 11 +++++++++-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/evaluation/.env-example b/evaluation/.env-example index 5381532c2..bab6f679e 100644 --- a/evaluation/.env-example +++ b/evaluation/.env-example @@ -21,4 +21,3 @@ MEMU_API_KEY="mu_xxx" SUPERMEMORY_API_KEY="sm_xxx" MEMOBASE_API_KEY="xxx" MEMOBASE_PROJECT_URL="http://***.***.***.***:8019" - diff --git a/src/memos/mem_cube/navie.py b/src/memos/mem_cube/navie.py index ecfd949b7..3afa78bab 100644 --- a/src/memos/mem_cube/navie.py +++ b/src/memos/mem_cube/navie.py @@ -31,7 +31,9 @@ def __init__( self._pref_mem: BaseTextMemory | None = pref_mem def load( - self, dir: str, memory_types: list[Literal["text_mem", "act_mem", "para_mem", "pref_mem"]] | None = None + self, + dir: str, + memory_types: list[Literal["text_mem", "act_mem", "para_mem", "pref_mem"]] | None = None, ) -> None: """Load memories. Args: diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index 3cd3c1bd2..8efed902f 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -10,10 +10,10 @@ from memos.log import get_logger from memos.memories.textual.item import TextualMemoryItem from memos.templates.prefer_complete_prompt import ( + NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT, NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT, NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT_FINE, NAIVE_JUDGE_UPDATE_OR_ADD_PROMPT_OP_TRACE, - NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT, ) from memos.vec_dbs.item import MilvusVecDBItem @@ -100,8 +100,7 @@ def _judge_dup_with_text_mem(self, new_pref: MilvusVecDBItem) -> bool: ) text_mem_recalls = [ - {"id": text_recall.id, "memory": text_recall.memory} - for text_recall in text_recalls + {"id": text_recall.id, "memory": text_recall.memory} for text_recall in text_recalls ] if not text_mem_recalls: @@ -111,9 +110,7 @@ def _judge_dup_with_text_mem(self, new_pref: MilvusVecDBItem) -> bool: prompt = NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT.replace( "{new_preference}", json.dumps(new_preference) - ).replace( - "{retrieved_memories}", json.dumps(text_mem_recalls) - ) + ).replace("{retrieved_memories}", json.dumps(text_mem_recalls)) try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) response = response.strip().replace("```json", "").replace("```", "").strip() @@ -141,7 +138,9 @@ def _judge_update_or_add_trace_op( logger.error(f"Error in judge_update_or_add_trace_op: {e}") return None - def _dedup_explicit_pref_by_textual(self, new_prefs: list[MilvusVecDBItem]) -> list[MilvusVecDBItem]: + def _dedup_explicit_pref_by_textual( + self, new_prefs: list[MilvusVecDBItem] + ) -> list[MilvusVecDBItem]: """Deduplicate explicit preferences by textual memory.""" if os.getenv("PREF_DEDUP_EXP_BY_TEXTUAL", "false").lower() != "true": return new_prefs diff --git a/src/memos/memories/textual/prefer_text_memory/factory.py b/src/memos/memories/textual/prefer_text_memory/factory.py index 2dc953b49..3c96b7dac 100644 --- a/src/memos/memories/textual/prefer_text_memory/factory.py +++ b/src/memos/memories/textual/prefer_text_memory/factory.py @@ -19,14 +19,21 @@ class AdderFactory(BaseAdder): @classmethod def from_config( - cls, config_factory: AdderConfigFactory, llm_provider=None, embedder=None, vector_db=None, text_mem=None + cls, + config_factory: AdderConfigFactory, + llm_provider=None, + embedder=None, + vector_db=None, + text_mem=None, ) -> BaseAdder: """Create a Adder instance from a configuration factory.""" backend = config_factory.backend if backend not in cls.backend_to_class: raise ValueError(f"Invalid backend: {backend}") adder_class = cls.backend_to_class[backend] - return adder_class(llm_provider=llm_provider, embedder=embedder, vector_db=vector_db, text_mem=text_mem) + return adder_class( + llm_provider=llm_provider, embedder=embedder, vector_db=vector_db, text_mem=text_mem + ) class ExtractorFactory(BaseExtractor): From 1135a262c5edf2294ad81c9a4423de06112fb32b Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Mon, 10 Nov 2025 15:25:59 +0800 Subject: [PATCH 3/4] add try catch logic in server router, add dedup logic in explicit pref --- src/memos/api/routers/server_router.py | 60 +++++++++++-------- .../textual/prefer_text_memory/adder.py | 22 ++++--- .../textual/prefer_text_memory/retrievers.py | 8 +++ src/memos/vec_dbs/milvus.py | 45 +++++++------- 4 files changed, 81 insertions(+), 54 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 12865fd65..82eafe5f2 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -388,36 +388,44 @@ def search_memories(search_req: APISearchRequest): search_mode = search_req.mode def _search_text(): - if search_mode == SearchMode.FAST: - formatted_memories = fast_search_memories( - search_req=search_req, user_context=user_context - ) - elif search_mode == SearchMode.FINE: - formatted_memories = fine_search_memories( - search_req=search_req, user_context=user_context - ) - elif search_mode == SearchMode.MIXTURE: - formatted_memories = mix_search_memories( - search_req=search_req, user_context=user_context - ) - else: - logger.error(f"Unsupported search mode: {search_mode}") - raise HTTPException(status_code=400, detail=f"Unsupported search mode: {search_mode}") - return formatted_memories + try: + if search_mode == SearchMode.FAST: + formatted_memories = fast_search_memories( + search_req=search_req, user_context=user_context + ) + elif search_mode == SearchMode.FINE: + formatted_memories = fine_search_memories( + search_req=search_req, user_context=user_context + ) + elif search_mode == SearchMode.MIXTURE: + formatted_memories = mix_search_memories( + search_req=search_req, user_context=user_context + ) + else: + logger.error(f"Unsupported search mode: {search_mode}") + raise HTTPException(status_code=400, detail=f"Unsupported search mode: {search_mode}") + return formatted_memories + except Exception as e: + logger.error("Error in search_text: %s; traceback: %s", e, traceback.format_exc()) + return [] def _search_pref(): if os.getenv("ENABLE_PREFERENCE_MEMORY", "false").lower() != "true": return [] - results = naive_mem_cube.pref_mem.search( - query=search_req.query, - top_k=search_req.pref_top_k, - info={ - "user_id": search_req.user_id, - "session_id": search_req.session_id, - "chat_history": search_req.chat_history, - }, - ) - return [_format_memory_item(data) for data in results] + try: + results = naive_mem_cube.pref_mem.search( + query=search_req.query, + top_k=search_req.pref_top_k, + info={ + "user_id": search_req.user_id, + "session_id": search_req.session_id, + "chat_history": search_req.chat_history, + }, + ) + return [_format_memory_item(data) for data in results] + except Exception as e: + logger.error("Error in _search_pref: %s; traceback: %s", e, traceback.format_exc()) + return [] with ContextThreadPoolExecutor(max_workers=2) as executor: text_future = executor.submit(_search_text) diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index 8efed902f..3e7c581ab 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -109,8 +109,10 @@ def _judge_dup_with_text_mem(self, new_pref: MilvusVecDBItem) -> bool: new_preference = {"id": new_pref.id, "memory": new_pref.payload["preference"]} prompt = NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT.replace( - "{new_preference}", json.dumps(new_preference) - ).replace("{retrieved_memories}", json.dumps(text_mem_recalls)) + "{new_preference}", json.dumps(new_preference, ensure_ascii=False) + ).replace( + "{retrieved_memories}", json.dumps(text_mem_recalls, ensure_ascii=False) + ) try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) response = response.strip().replace("```json", "").replace("```", "").strip() @@ -142,7 +144,7 @@ def _dedup_explicit_pref_by_textual( self, new_prefs: list[MilvusVecDBItem] ) -> list[MilvusVecDBItem]: """Deduplicate explicit preferences by textual memory.""" - if os.getenv("PREF_DEDUP_EXP_BY_TEXTUAL", "false").lower() != "true": + if os.getenv("DEDUP_PREF_EXP_BY_TEXTUAL", "false").lower() != "true" or not self.text_mem: return new_prefs dedup_prefs = [] with ContextThreadPoolExecutor(max_workers=max(1, min(len(new_prefs), 5))) as executor: @@ -205,8 +207,11 @@ def _update_memory_op_trace( ] rsp = self._judge_update_or_add_trace_op( - new_mems=json.dumps(new_mem_inputs), - retrieved_mems=json.dumps(retrieved_mem_inputs) if retrieved_mem_inputs else "", + new_mems=json.dumps(new_mem_inputs, ensure_ascii=False), + retrieved_mems= + json.dumps(retrieved_mem_inputs, ensure_ascii=False) + if retrieved_mem_inputs + else "", ) if not rsp: dedup_rsp = self._dedup_explicit_pref_by_textual(new_vec_db_items) @@ -293,8 +298,11 @@ def _update_memory_fine( if mem.payload.get("preference", None) ] rsp = self._judge_update_or_add_fine( - new_mem=json.dumps(new_mem_input), - retrieved_mems=json.dumps(retrieved_mem_inputs) if retrieved_mem_inputs else "", + new_mem=json.dumps(new_mem_input, ensure_ascii=False), + retrieved_mems= + json.dumps(retrieved_mem_inputs, ensure_ascii=False) + if retrieved_mem_inputs + else "", ) need_update = rsp.get("need_update", False) if rsp else False need_update = ( diff --git a/src/memos/memories/textual/prefer_text_memory/retrievers.py b/src/memos/memories/textual/prefer_text_memory/retrievers.py index 0074c3f1c..c2e0a442a 100644 --- a/src/memos/memories/textual/prefer_text_memory/retrievers.py +++ b/src/memos/memories/textual/prefer_text_memory/retrievers.py @@ -119,6 +119,9 @@ def retrieve( if pref.payload.get("preference", None) ] + # store explicit id and score, use it after reranker + explicit_id_scores = {item.id: item.score for item in explicit_prefs} + reranker_map = { "naive": self._naive_reranker, "original_text": self._original_text_reranker, @@ -131,4 +134,9 @@ def retrieve( query=query, prefs_mem=implicit_prefs_mem, prefs=implicit_prefs, top_k=top_k ) + # filter explicit mem by score bigger than threshold + explicit_prefs_mem = [ + item for item in explicit_prefs_mem if explicit_id_scores.get(item.id, 0) >= 0.2 + ] + return explicit_prefs_mem + implicit_prefs_mem diff --git a/src/memos/vec_dbs/milvus.py b/src/memos/vec_dbs/milvus.py index e50c8ce18..eafee2633 100644 --- a/src/memos/vec_dbs/milvus.py +++ b/src/memos/vec_dbs/milvus.py @@ -236,29 +236,32 @@ def search( "sparse": self._sparse_search, "hybrid": self._hybrid_search, } + try: + results = search_func_map[search_type]( + collection_name=collection_name, + query_vector=query_vector, + query=query, + top_k=top_k, + filter=expr, + ) - results = search_func_map[search_type]( - collection_name=collection_name, - query_vector=query_vector, - query=query, - top_k=top_k, - filter=expr, - ) - - items = [] - for hit in results[0]: - entity = hit.get("entity", {}) - - items.append( - MilvusVecDBItem( - id=str(entity.get("id")), - memory=entity.get("memory"), - original_text=entity.get("original_text"), - vector=entity.get("vector"), - payload=entity.get("payload", {}), - score=1 - float(hit["distance"]), + items = [] + for hit in results[0]: + entity = hit.get("entity", {}) + + items.append( + MilvusVecDBItem( + id=str(entity.get("id")), + memory=entity.get("memory"), + original_text=entity.get("original_text"), + vector=entity.get("vector"), + payload=entity.get("payload", {}), + score=1 - float(hit["distance"]), + ) ) - ) + except Exception as e: + logger.error("Error in _%s_search: %s", search_type, e) + return [] logger.info(f"Milvus search completed with {len(items)} results.") return items From 1c254f6cf609939b23f64b3a8ee85cea0736affb Mon Sep 17 00:00:00 2001 From: "yuan.wang" Date: Mon, 10 Nov 2025 15:29:57 +0800 Subject: [PATCH 4/4] fixbug in make pre_commit --- src/memos/api/routers/server_router.py | 4 +++- .../textual/prefer_text_memory/adder.py | 18 +++++++----------- .../textual/prefer_text_memory/retrievers.py | 2 +- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 82eafe5f2..b426c2965 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -403,7 +403,9 @@ def _search_text(): ) else: logger.error(f"Unsupported search mode: {search_mode}") - raise HTTPException(status_code=400, detail=f"Unsupported search mode: {search_mode}") + raise HTTPException( + status_code=400, detail=f"Unsupported search mode: {search_mode}" + ) return formatted_memories except Exception as e: logger.error("Error in search_text: %s; traceback: %s", e, traceback.format_exc()) diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index 3e7c581ab..5e58d23a5 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -110,9 +110,7 @@ def _judge_dup_with_text_mem(self, new_pref: MilvusVecDBItem) -> bool: prompt = NAIVE_JUDGE_DUP_WITH_TEXT_MEM_PROMPT.replace( "{new_preference}", json.dumps(new_preference, ensure_ascii=False) - ).replace( - "{retrieved_memories}", json.dumps(text_mem_recalls, ensure_ascii=False) - ) + ).replace("{retrieved_memories}", json.dumps(text_mem_recalls, ensure_ascii=False)) try: response = self.llm_provider.generate([{"role": "user", "content": prompt}]) response = response.strip().replace("```json", "").replace("```", "").strip() @@ -208,10 +206,9 @@ def _update_memory_op_trace( rsp = self._judge_update_or_add_trace_op( new_mems=json.dumps(new_mem_inputs, ensure_ascii=False), - retrieved_mems= - json.dumps(retrieved_mem_inputs, ensure_ascii=False) - if retrieved_mem_inputs - else "", + retrieved_mems=json.dumps(retrieved_mem_inputs, ensure_ascii=False) + if retrieved_mem_inputs + else "", ) if not rsp: dedup_rsp = self._dedup_explicit_pref_by_textual(new_vec_db_items) @@ -299,10 +296,9 @@ def _update_memory_fine( ] rsp = self._judge_update_or_add_fine( new_mem=json.dumps(new_mem_input, ensure_ascii=False), - retrieved_mems= - json.dumps(retrieved_mem_inputs, ensure_ascii=False) - if retrieved_mem_inputs - else "", + retrieved_mems=json.dumps(retrieved_mem_inputs, ensure_ascii=False) + if retrieved_mem_inputs + else "", ) need_update = rsp.get("need_update", False) if rsp else False need_update = ( diff --git a/src/memos/memories/textual/prefer_text_memory/retrievers.py b/src/memos/memories/textual/prefer_text_memory/retrievers.py index c2e0a442a..9f0d1ab32 100644 --- a/src/memos/memories/textual/prefer_text_memory/retrievers.py +++ b/src/memos/memories/textual/prefer_text_memory/retrievers.py @@ -134,7 +134,7 @@ def retrieve( query=query, prefs_mem=implicit_prefs_mem, prefs=implicit_prefs, top_k=top_k ) - # filter explicit mem by score bigger than threshold + # filter explicit mem by score bigger than threshold explicit_prefs_mem = [ item for item in explicit_prefs_mem if explicit_id_scores.get(item.id, 0) >= 0.2 ]