diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 567015dfc..30b75451f 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -34,7 +34,10 @@ def get_current_time() -> str: agent = RealtimeAgent( name="Twilio Assistant", - instructions="You are a helpful assistant that starts every conversation with a creative greeting. Keep responses concise and friendly since this is a phone conversation.", + instructions=( + "You are a helpful assistant that starts every conversation with a creative greeting. " + "Keep responses concise and friendly since this is a phone conversation." + ), tools=[get_weather, get_current_time], ) @@ -46,21 +49,39 @@ def __init__(self, twilio_websocket: WebSocket): self.session: RealtimeSession | None = None self.playback_tracker = RealtimePlaybackTracker() - # Audio buffering configuration (matching CLI demo) - self.CHUNK_LENGTH_S = 0.05 # 50ms chunks like CLI demo - self.SAMPLE_RATE = 8000 # Twilio uses 8kHz for g711_ulaw - self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio + # Audio chunking (matches CLI demo) + self.CHUNK_LENGTH_S = 0.05 # 50ms chunks + self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz + self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms self._stream_sid: str | None = None self._audio_buffer: bytearray = bytearray() self._last_buffer_send_time = time.time() - # Mark event tracking for playback + # Playback tracking for outbound audio self._mark_counter = 0 self._mark_data: dict[ str, tuple[str, int, int] ] = {} # mark_id -> (item_id, content_index, byte_count) + # ---- Deterministic startup warm-up (preferred over sleep) ---- + # Buffer the first N chunks before sending to OpenAI; then mark warmed. + try: + self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3"))) + except Exception: + self.STARTUP_BUFFER_CHUNKS = 3 + + self._startup_buffer = bytearray() + self._startup_warmed = ( + self.STARTUP_BUFFER_CHUNKS == 0 + ) # if 0, considered warmed immediately + + # Optional delay (defaults 0.0 because buffering is preferred) + try: + self.STARTUP_DELAY_S = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.0")) + except Exception: + self.STARTUP_DELAY_S = 0.0 + async def start(self) -> None: """Start the session.""" runner = RealtimeRunner(agent) @@ -89,6 +110,11 @@ async def start(self) -> None: await self.twilio_websocket.accept() print("Twilio WebSocket connection accepted") + # Optional tiny delay (kept configurable; default 0.0) + if self.STARTUP_DELAY_S > 0: + await asyncio.sleep(self.STARTUP_DELAY_S) + + # Start loops after handshake self._realtime_session_task = asyncio.create_task(self._realtime_session_loop()) self._message_loop_task = asyncio.create_task(self._twilio_message_loop()) self._buffer_flush_task = asyncio.create_task(self._buffer_flush_loop()) @@ -197,7 +223,7 @@ async def _handle_media_event(self, message: dict[str, Any]) -> None: # Add original µ-law to buffer for OpenAI (they expect µ-law) self._audio_buffer.extend(ulaw_bytes) - # Send buffered audio if we have enough data + # Send buffered audio if we have enough data for one chunk if len(self._audio_buffer) >= self.BUFFER_SIZE_BYTES: await self._flush_audio_buffer() @@ -210,39 +236,47 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None: mark_data = message.get("mark", {}) mark_id = mark_data.get("name", "") - # Look up stored data for this mark ID if mark_id in self._mark_data: item_id, item_content_index, byte_count = self._mark_data[mark_id] - - # Convert byte count back to bytes for playback tracker - audio_bytes = b"\x00" * byte_count # Placeholder bytes - - # Update playback tracker + audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes) print( f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes" ) - - # Clean up the stored data del self._mark_data[mark_id] except Exception as e: print(f"Error handling mark event: {e}") async def _flush_audio_buffer(self) -> None: - """Send buffered audio to OpenAI.""" + """Send buffered audio to OpenAI with deterministic startup warm-up.""" if not self._audio_buffer or not self.session: return try: - # Send the buffered audio buffer_data = bytes(self._audio_buffer) - await self.session.send_audio(buffer_data) - - # Clear the buffer self._audio_buffer.clear() self._last_buffer_send_time = time.time() + # During startup, accumulate first N chunks before sending anything + if not self._startup_warmed: + self._startup_buffer.extend(buffer_data) + + # target bytes = N chunks * bytes-per-chunk + target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS) + + if len(self._startup_buffer) >= target_bytes: + # Warm-up complete: flush all buffered data in order + await self.session.send_audio(bytes(self._startup_buffer)) + self._startup_buffer.clear() + self._startup_warmed = True + else: + # Not enough yet; keep buffering and return + return + else: + # Already warmed: send immediately + await self.session.send_audio(buffer_data) + except Exception as e: print(f"Error sending buffered audio to OpenAI: {e}") @@ -250,7 +284,7 @@ async def _buffer_flush_loop(self) -> None: """Periodically flush audio buffer to prevent stale data.""" try: while True: - await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms + await asyncio.sleep(self.CHUNK_LENGTH_S) # check every 50ms # If buffer has data and it's been too long since last send, flush it current_time = time.time()