1212from typing_extensions import assert_never
1313
1414from agents .realtime import RealtimeRunner , RealtimeSession , RealtimeSessionEvent
15+ from agents .realtime .config import RealtimeUserInputMessage
16+ from agents .realtime .model_inputs import RealtimeModelSendRawMessage
1517
1618# Import TwilioHandler class - handle both module and package use cases
1719if TYPE_CHECKING :
@@ -64,6 +66,34 @@ async def send_audio(self, session_id: str, audio_bytes: bytes):
6466 if session_id in self .active_sessions :
6567 await self .active_sessions [session_id ].send_audio (audio_bytes )
6668
69+ async def send_client_event (self , session_id : str , event : dict [str , Any ]):
70+ """Send a raw client event to the underlying realtime model."""
71+ session = self .active_sessions .get (session_id )
72+ if not session :
73+ return
74+ await session .model .send_event (
75+ RealtimeModelSendRawMessage (
76+ message = {
77+ "type" : event ["type" ],
78+ "other_data" : {k : v for k , v in event .items () if k != "type" },
79+ }
80+ )
81+ )
82+
83+ async def send_user_message (self , session_id : str , message : RealtimeUserInputMessage ):
84+ """Send a structured user message via the higher-level API (supports input_image)."""
85+ session = self .active_sessions .get (session_id )
86+ if not session :
87+ return
88+ await session .send_message (message ) # delegates to RealtimeModelSendUserInput path
89+
90+ async def interrupt (self , session_id : str ) -> None :
91+ """Interrupt current model playback/response for a session."""
92+ session = self .active_sessions .get (session_id )
93+ if not session :
94+ return
95+ await session .interrupt ()
96+
6797 async def _process_events (self , session_id : str ):
6898 try :
6999 session = self .active_sessions [session_id ]
@@ -101,7 +131,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
101131 elif event .type == "history_updated" :
102132 base_event ["history" ] = [item .model_dump (mode = "json" ) for item in event .history ]
103133 elif event .type == "history_added" :
104- pass
134+ # Provide the added item so the UI can render incrementally.
135+ try :
136+ base_event ["item" ] = event .item .model_dump (mode = "json" )
137+ except Exception :
138+ base_event ["item" ] = None
105139 elif event .type == "guardrail_tripped" :
106140 base_event ["guardrail_results" ] = [
107141 {"name" : result .guardrail .name } for result in event .guardrail_results
@@ -134,6 +168,7 @@ async def lifespan(app: FastAPI):
134168@app .websocket ("/ws/{session_id}" )
135169async def websocket_endpoint (websocket : WebSocket , session_id : str ):
136170 await manager .connect (websocket , session_id )
171+ image_buffers : dict [str , dict [str , Any ]] = {}
137172 try :
138173 while True :
139174 data = await websocket .receive_text ()
@@ -144,6 +179,124 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str):
144179 int16_data = message ["data" ]
145180 audio_bytes = struct .pack (f"{ len (int16_data )} h" , * int16_data )
146181 await manager .send_audio (session_id , audio_bytes )
182+ elif message ["type" ] == "image" :
183+ logger .info ("Received image message from client (session %s)." , session_id )
184+ # Build a conversation.item.create with input_image (and optional input_text)
185+ data_url = message .get ("data_url" )
186+ prompt_text = message .get ("text" ) or "Please describe this image."
187+ if data_url :
188+ logger .info (
189+ "Forwarding image (structured message) to Realtime API (len=%d)." ,
190+ len (data_url ),
191+ )
192+ user_msg : RealtimeUserInputMessage = {
193+ "type" : "message" ,
194+ "role" : "user" ,
195+ "content" : (
196+ [
197+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" },
198+ {"type" : "input_text" , "text" : prompt_text },
199+ ]
200+ if prompt_text
201+ else [
202+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" }
203+ ]
204+ ),
205+ }
206+ await manager .send_user_message (session_id , user_msg )
207+ # Acknowledge to client UI
208+ await websocket .send_text (
209+ json .dumps (
210+ {
211+ "type" : "client_info" ,
212+ "info" : "image_enqueued" ,
213+ "size" : len (data_url ),
214+ }
215+ )
216+ )
217+ else :
218+ await websocket .send_text (
219+ json .dumps (
220+ {
221+ "type" : "error" ,
222+ "error" : "No data_url for image message." ,
223+ }
224+ )
225+ )
226+ elif message ["type" ] == "commit_audio" :
227+ # Force close the current input audio turn
228+ await manager .send_client_event (session_id , {"type" : "input_audio_buffer.commit" })
229+ elif message ["type" ] == "image_start" :
230+ img_id = str (message .get ("id" ))
231+ image_buffers [img_id ] = {
232+ "text" : message .get ("text" ) or "Please describe this image." ,
233+ "chunks" : [],
234+ }
235+ await websocket .send_text (
236+ json .dumps ({"type" : "client_info" , "info" : "image_start_ack" , "id" : img_id })
237+ )
238+ elif message ["type" ] == "image_chunk" :
239+ img_id = str (message .get ("id" ))
240+ chunk = message .get ("chunk" , "" )
241+ if img_id in image_buffers :
242+ image_buffers [img_id ]["chunks" ].append (chunk )
243+ if len (image_buffers [img_id ]["chunks" ]) % 10 == 0 :
244+ await websocket .send_text (
245+ json .dumps (
246+ {
247+ "type" : "client_info" ,
248+ "info" : "image_chunk_ack" ,
249+ "id" : img_id ,
250+ "count" : len (image_buffers [img_id ]["chunks" ]),
251+ }
252+ )
253+ )
254+ elif message ["type" ] == "image_end" :
255+ img_id = str (message .get ("id" ))
256+ buf = image_buffers .pop (img_id , None )
257+ if buf is None :
258+ await websocket .send_text (
259+ json .dumps ({"type" : "error" , "error" : "Unknown image id for image_end." })
260+ )
261+ else :
262+ data_url = "" .join (buf ["chunks" ]) if buf ["chunks" ] else None
263+ prompt_text = buf ["text" ]
264+ if data_url :
265+ logger .info (
266+ "Forwarding chunked image (structured message) to Realtime API (len=%d)." ,
267+ len (data_url ),
268+ )
269+ user_msg2 : RealtimeUserInputMessage = {
270+ "type" : "message" ,
271+ "role" : "user" ,
272+ "content" : (
273+ [
274+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" },
275+ {"type" : "input_text" , "text" : prompt_text },
276+ ]
277+ if prompt_text
278+ else [
279+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" }
280+ ]
281+ ),
282+ }
283+ await manager .send_user_message (session_id , user_msg2 )
284+ await websocket .send_text (
285+ json .dumps (
286+ {
287+ "type" : "client_info" ,
288+ "info" : "image_enqueued" ,
289+ "id" : img_id ,
290+ "size" : len (data_url ),
291+ }
292+ )
293+ )
294+ else :
295+ await websocket .send_text (
296+ json .dumps ({"type" : "error" , "error" : "Empty image." })
297+ )
298+ elif message ["type" ] == "interrupt" :
299+ await manager .interrupt (session_id )
147300
148301 except WebSocketDisconnect :
149302 await manager .disconnect (session_id )
@@ -160,4 +313,10 @@ async def read_index():
160313if __name__ == "__main__" :
161314 import uvicorn
162315
163- uvicorn .run (app , host = "0.0.0.0" , port = 8000 )
316+ uvicorn .run (
317+ app ,
318+ host = "0.0.0.0" ,
319+ port = 8000 ,
320+ # Increased WebSocket frame size to comfortably handle image data URLs.
321+ ws_max_size = 16 * 1024 * 1024 ,
322+ )
0 commit comments