1- import asyncio
2-
3- from ....env import LOG , ProfileConfig
1+ from .... connectors import Session
2+ from .... env import LOG , ProfileConfig , CONFIG
3+ from ....utils import get_blob_str , get_encoded_tokens
44from ....models .blob import Blob
5- from ....models .utils import Promise
5+ from ....models .utils import Promise , CODE
66from ....models .response import IdsData , ChatModalResponse
77from ...profile import add_user_profiles , update_user_profiles , delete_user_profiles
88from ...event import append_user_event
1515from .entry_summary import entry_summary
1616
1717
18+ def truncate_chat_blobs (
19+ blob_ids : list [str ], blobs : list [Blob ], max_token_size : int
20+ ) -> tuple [list [str ], list [Blob ]]:
21+ results_ids = []
22+ results = []
23+ total_token_size = 0
24+ for b , bid in zip (blobs [::- 1 ], blob_ids [::- 1 ]):
25+ ts = len (get_encoded_tokens (get_blob_str (b )))
26+ total_token_size += ts
27+ if total_token_size <= max_token_size :
28+ results .append (b )
29+ results_ids .append (bid )
30+ else :
31+ break
32+ return results_ids [::- 1 ], results [::- 1 ]
33+
34+
1835async def process_blobs (
1936 user_id : str , project_id : str , blob_ids : list [str ], blobs : list [Blob ]
2037) -> Promise [ChatModalResponse ]:
2138 # 1. Extract patch profiles
39+ blob_ids , blobs = truncate_chat_blobs (
40+ blob_ids , blobs , CONFIG .max_chat_blob_buffer_process_token_size
41+ )
42+ if len (blobs ) == 0 :
43+ return Promise .reject (
44+ CODE .SERVER_PARSE_ERROR , "No blobs to process after truncating"
45+ )
2246 p = await entry_summary (user_id , project_id , blobs )
2347 if not p .ok ():
2448 return p
@@ -45,16 +69,6 @@ async def process_blobs(
4569 delta_profile_data = [
4670 p for p in (profile_options ["add" ] + profile_options ["update_delta" ])
4771 ]
48- p = await handle_session_event (
49- user_id ,
50- project_id ,
51- user_memo_str ,
52- delta_profile_data ,
53- extracted_data ["config" ],
54- )
55- if not p .ok ():
56- return p
57- eid = p .data ()
5872
5973 # 3. Check if we need to organize profiles
6074 p = await organize_profiles (
@@ -74,7 +88,17 @@ async def process_blobs(
7488 if not p .ok ():
7589 LOG .error (f"Failed to re-summary profiles: { p .msg ()} " )
7690
77- # DB commit
91+ # FIXME using one session for all operations
92+ p = await handle_session_event (
93+ user_id ,
94+ project_id ,
95+ user_memo_str ,
96+ delta_profile_data ,
97+ extracted_data ["config" ],
98+ )
99+ if not p .ok ():
100+ return p
101+ eid = p .data ()
78102 p = await exe_user_profile_add (user_id , project_id , profile_options )
79103 if not p .ok ():
80104 return p
@@ -158,7 +182,7 @@ async def exe_user_profile_update(
158182
159183async def exe_user_profile_delete (
160184 user_id : str , project_id : str , profile_options : MergeAddResult
161- ) -> Promise [None ]:
185+ ) -> Promise [IdsData ]:
162186 if not len (profile_options ["delete" ]):
163187 return Promise .resolve (IdsData (ids = []))
164188 LOG .info (f"Deleting { len (profile_options ['delete' ])} profiles for user { user_id } " )
0 commit comments