66import threading
77import time
88import traceback
9+ from collections import deque
910from datetime import datetime
1011from typing import Any , Dict , List , Optional , Union
1112
3637
3738complete_message = {"role" : "server" , "type" : "status" , "content" : "complete" }
3839
40+ os .environ ["INTERPRETER_REQUIRE_ACKNOWLEDGE" ] = "True"
41+
3942
4043class AsyncInterpreter (OpenInterpreter ):
4144 def __init__ (self , * args , ** kwargs ):
@@ -44,6 +47,7 @@ def __init__(self, *args, **kwargs):
4447 self .respond_thread = None
4548 self .stop_event = threading .Event ()
4649 self .output_queue = None
50+ self .unsent_messages = deque ()
4751 self .id = os .getenv ("INTERPRETER_ID" , datetime .now ().timestamp ())
4852 self .print = True # Will print output
4953
@@ -441,91 +445,101 @@ async def receive_input():
441445 async def send_output ():
442446 while True :
443447 try :
444- output = await async_interpreter .output ()
445- # print("Attempting to send the following output:", output)
448+ # First, try to send any unsent messages
449+ while async_interpreter .unsent_messages :
450+ output = async_interpreter .unsent_messages [0 ]
451+ try :
452+ await send_message (output )
453+ async_interpreter .unsent_messages .popleft ()
454+ except Exception :
455+ # If we can't send, break and try again later
456+ break
446457
447- id = shortuuid .uuid ()
458+ # If we've sent all unsent messages, get a new output
459+ if not async_interpreter .unsent_messages :
460+ output = await async_interpreter .output ()
461+ await send_message (output )
448462
449- for attempt in range (100 ):
450- try :
451- if isinstance (output , bytes ):
452- await websocket .send_bytes (output )
453- else :
454- if async_interpreter .require_acknowledge :
455- output ["id" ] = id
456-
457- await websocket .send_text (json .dumps (output ))
458-
459- if async_interpreter .require_acknowledge :
460- acknowledged = False
461- for _ in range (1000 ):
462- # print(async_interpreter.acknowledged_outputs)
463- if (
464- id
465- in async_interpreter .acknowledged_outputs
466- ):
467- async_interpreter .acknowledged_outputs .remove (
468- id
469- )
470- acknowledged = True
471- break
472- await asyncio .sleep (0.0001 )
473-
474- if acknowledged :
475- break
476- else :
477- raise Exception (
478- "Acknowledgement not received."
479- )
480- else :
481- break
482-
483- except Exception as e :
484- print (
485- "Failed to send output on attempt number:" ,
486- attempt + 1 ,
487- ". Output was:" ,
488- output ,
489- )
490- print ("Error:" , str (e ))
491- await asyncio .sleep (0.05 )
492- else :
493- raise Exception (
494- "Failed to send after 100 attempts. Output was:" ,
495- str (output ),
496- )
497463 except Exception as e :
498464 error = traceback .format_exc () + "\n " + str (e )
499465 error_message = {
500466 "role" : "server" ,
501467 "type" : "error" ,
502- "content" : traceback . format_exc () + " \n " + str ( e ) ,
468+ "content" : error ,
503469 }
504- await websocket . send_text ( json . dumps (error_message ) )
505- await websocket . send_text ( json . dumps (complete_message ) )
506- print ("\n \n --- SENT ERROR: ---\n \n " )
470+ async_interpreter . unsent_messages . append (error_message )
471+ async_interpreter . unsent_messages . append (complete_message )
472+ print ("\n \n --- ERROR (will be sent when possible) : ---\n \n " )
507473 print (error )
508- print ("\n \n --- (ERROR ABOVE WAS SENT) ---\n \n " )
474+ print (
475+ "\n \n --- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n \n "
476+ )
477+
478+ async def send_message (output ):
479+ if isinstance (output , dict ) and "id" in output :
480+ id = output ["id" ]
481+ else :
482+ id = shortuuid .uuid ()
483+ if (
484+ isinstance (output , dict )
485+ and async_interpreter .require_acknowledge
486+ ):
487+ output ["id" ] = id
488+
489+ for attempt in range (100 ):
490+ if websocket .client_state == 3 : # 3 represents 'CLOSED' state
491+ break
492+ try :
493+ if isinstance (output , bytes ):
494+ await websocket .send_bytes (output )
495+ else :
496+ if async_interpreter .require_acknowledge :
497+ output ["id" ] = id
498+ await websocket .send_text (json .dumps (output ))
499+
500+ if async_interpreter .require_acknowledge :
501+ acknowledged = False
502+ for _ in range (1000 ):
503+ if id in async_interpreter .acknowledged_outputs :
504+ async_interpreter .acknowledged_outputs .remove (id )
505+ acknowledged = True
506+ break
507+ await asyncio .sleep (0.0001 )
508+
509+ if acknowledged :
510+ return
511+ else :
512+ raise Exception ("Acknowledgement not received." )
513+ else :
514+ return
515+
516+ except Exception as e :
517+ print (
518+ f"Failed to send output on attempt number: { attempt + 1 } . Output was: { output } "
519+ )
520+ print (f"Error: { str (e )} " )
521+ await asyncio .sleep (0.05 )
522+
523+ # If we've reached this point, we've failed to send after 100 attempts
524+ async_interpreter .unsent_messages .append (output )
525+ print (
526+ f"Added message to unsent_messages queue after failed attempts: { output } "
527+ )
509528
510529 await asyncio .gather (receive_input (), send_output ())
530+
511531 except Exception as e :
512- try :
513- error = traceback .format_exc () + "\n " + str (e )
514- error_message = {
515- "role" : "server" ,
516- "type" : "error" ,
517- "content" : traceback .format_exc () + "\n " + str (e ),
518- }
519- await websocket .send_text (json .dumps (error_message ))
520- await websocket .send_text (json .dumps (complete_message ))
521- print ("\n \n --- SENT ERROR: ---\n \n " )
522- print (error )
523- print ("\n \n --- (ERROR ABOVE WAS SENT) ---\n \n " )
524- except :
525- # If we can't send it, that's fine.
526- pass
527- finally :
528- await websocket .close ()
532+ error = traceback .format_exc () + "\n " + str (e )
533+ error_message = {
534+ "role" : "server" ,
535+ "type" : "error" ,
536+ "content" : error ,
537+ }
538+ async_interpreter .unsent_messages .append (error_message )
539+ async_interpreter .unsent_messages .append (complete_message )
540+ print ("\n \n --- ERROR (will be sent when possible): ---\n \n " )
541+ print (error )
542+ print ("\n \n --- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n \n " )
529543
530544 # TODO
531545 @router .post ("/" )
0 commit comments