From 4622e6d1e0541d1016baba6da5936cca491e05f9 Mon Sep 17 00:00:00 2001 From: Shraman Hazra Date: Mon, 3 Nov 2025 11:48:49 +0530 Subject: [PATCH 1/4] fix(twilio): add configurable startup delay to avoid initial audio jitter (fixes #1906) --- examples/realtime/twilio/twilio_handler.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 567015dfc..925148f22 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -89,6 +89,23 @@ async def start(self) -> None: await self.twilio_websocket.accept() print("Twilio WebSocket connection accepted") + # --------------------------- + # STARTUP DELAY (fix jitter) + # --------------------------- + # Small configurable delay to allow websockets/handshakes and buffers + # to settle before audio starts. Default is 0.5 seconds. + try: + startup_delay = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.5")) + except Exception: + startup_delay = 0.5 + + # Only perform the sleep if a positive value is configured + if startup_delay > 0: + # allow other coroutines to run while waiting + await asyncio.sleep(startup_delay) + # --------------------------- + + # create tasks after warmup so we avoid missing the first audio frames self._realtime_session_task = asyncio.create_task(self._realtime_session_loop()) self._message_loop_task = asyncio.create_task(self._twilio_message_loop()) self._buffer_flush_task = asyncio.create_task(self._buffer_flush_loop()) From 339ce0b7c17f7f30cfc0b136e8a430dd5859a52c Mon Sep 17 00:00:00 2001 From: Shraman Hazra Date: Tue, 4 Nov 2025 20:11:21 +0530 Subject: [PATCH 2/4] fix(twilio): deterministic startup warm-up by buffering first N chunks; default N=3 --- examples/realtime/twilio/twilio_handler.py | 97 ++++++++++++---------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 925148f22..00764b092 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -34,7 +34,10 @@ def get_current_time() -> str: agent = RealtimeAgent( name="Twilio Assistant", - instructions="You are a helpful assistant that starts every conversation with a creative greeting. Keep responses concise and friendly since this is a phone conversation.", + instructions=( + "You are a helpful assistant that starts every conversation with a creative greeting. " + "Keep responses concise and friendly since this is a phone conversation." + ), tools=[get_weather, get_current_time], ) @@ -46,20 +49,34 @@ def __init__(self, twilio_websocket: WebSocket): self.session: RealtimeSession | None = None self.playback_tracker = RealtimePlaybackTracker() - # Audio buffering configuration (matching CLI demo) - self.CHUNK_LENGTH_S = 0.05 # 50ms chunks like CLI demo - self.SAMPLE_RATE = 8000 # Twilio uses 8kHz for g711_ulaw - self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio + # Audio chunking (matches CLI demo) + self.CHUNK_LENGTH_S = 0.05 # 50ms chunks + self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz + self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms self._stream_sid: str | None = None self._audio_buffer: bytearray = bytearray() self._last_buffer_send_time = time.time() - # Mark event tracking for playback + # Playback tracking for outbound audio self._mark_counter = 0 - self._mark_data: dict[ - str, tuple[str, int, int] - ] = {} # mark_id -> (item_id, content_index, byte_count) + self._mark_data: dict[str, tuple[str, int, int]] = {} # mark_id -> (item_id, content_index, byte_count) + + # ---- Deterministic startup warm-up (preferred over sleep) ---- + # Buffer the first N chunks before sending to OpenAI; then mark warmed. + try: + self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3"))) + except Exception: + self.STARTUP_BUFFER_CHUNKS = 3 + + self._startup_buffer = bytearray() + self._startup_warmed = self.STARTUP_BUFFER_CHUNKS == 0 # if 0, considered warmed immediately + + # Optional delay (defaults 0.0 because buffering is preferred) + try: + self.STARTUP_DELAY_S = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.0")) + except Exception: + self.STARTUP_DELAY_S = 0.0 async def start(self) -> None: """Start the session.""" @@ -89,23 +106,11 @@ async def start(self) -> None: await self.twilio_websocket.accept() print("Twilio WebSocket connection accepted") - # --------------------------- - # STARTUP DELAY (fix jitter) - # --------------------------- - # Small configurable delay to allow websockets/handshakes and buffers - # to settle before audio starts. Default is 0.5 seconds. - try: - startup_delay = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.5")) - except Exception: - startup_delay = 0.5 - - # Only perform the sleep if a positive value is configured - if startup_delay > 0: - # allow other coroutines to run while waiting - await asyncio.sleep(startup_delay) - # --------------------------- + # Optional tiny delay (kept configurable; default 0.0) + if self.STARTUP_DELAY_S > 0: + await asyncio.sleep(self.STARTUP_DELAY_S) - # create tasks after warmup so we avoid missing the first audio frames + # Start loops after handshake self._realtime_session_task = asyncio.create_task(self._realtime_session_loop()) self._message_loop_task = asyncio.create_task(self._twilio_message_loop()) self._buffer_flush_task = asyncio.create_task(self._buffer_flush_loop()) @@ -214,7 +219,7 @@ async def _handle_media_event(self, message: dict[str, Any]) -> None: # Add original µ-law to buffer for OpenAI (they expect µ-law) self._audio_buffer.extend(ulaw_bytes) - # Send buffered audio if we have enough data + # Send buffered audio if we have enough data for one chunk if len(self._audio_buffer) >= self.BUFFER_SIZE_BYTES: await self._flush_audio_buffer() @@ -227,39 +232,45 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None: mark_data = message.get("mark", {}) mark_id = mark_data.get("name", "") - # Look up stored data for this mark ID if mark_id in self._mark_data: item_id, item_content_index, byte_count = self._mark_data[mark_id] - - # Convert byte count back to bytes for playback tracker - audio_bytes = b"\x00" * byte_count # Placeholder bytes - - # Update playback tracker + audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes) - print( - f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes" - ) - - # Clean up the stored data + print(f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes") del self._mark_data[mark_id] except Exception as e: print(f"Error handling mark event: {e}") async def _flush_audio_buffer(self) -> None: - """Send buffered audio to OpenAI.""" + """Send buffered audio to OpenAI with deterministic startup warm-up.""" if not self._audio_buffer or not self.session: return try: - # Send the buffered audio buffer_data = bytes(self._audio_buffer) - await self.session.send_audio(buffer_data) - - # Clear the buffer self._audio_buffer.clear() self._last_buffer_send_time = time.time() + # During startup, accumulate first N chunks before sending anything + if not self._startup_warmed: + self._startup_buffer.extend(buffer_data) + + # target bytes = N chunks * bytes-per-chunk + target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS) + + if len(self._startup_buffer) >= target_bytes: + # Warm-up complete: flush all buffered data in order + await self.session.send_audio(bytes(self._startup_buffer)) + self._startup_buffer.clear() + self._startup_warmed = True + else: + # Not enough yet; keep buffering and return + return + else: + # Already warmed: send immediately + await self.session.send_audio(buffer_data) + except Exception as e: print(f"Error sending buffered audio to OpenAI: {e}") @@ -267,7 +278,7 @@ async def _buffer_flush_loop(self) -> None: """Periodically flush audio buffer to prevent stale data.""" try: while True: - await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms + await asyncio.sleep(self.CHUNK_LENGTH_S) # check every 50ms # If buffer has data and it's been too long since last send, flush it current_time = time.time() From abab19c4463b83e523a9aa16b6c04cf5e14ad0c8 Mon Sep 17 00:00:00 2001 From: Shraman Hazra Date: Wed, 5 Nov 2025 13:49:50 +0530 Subject: [PATCH 3/4] style: fix lint issues in twilio_handler.py --- examples/realtime/twilio/twilio_handler.py | 33 ++++++++++++++++------ 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 00764b092..18aa244c6 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -51,8 +51,10 @@ def __init__(self, twilio_websocket: WebSocket): # Audio chunking (matches CLI demo) self.CHUNK_LENGTH_S = 0.05 # 50ms chunks - self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz - self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms + self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz + self.BUFFER_SIZE_BYTES = int( + self.SAMPLE_RATE * self.CHUNK_LENGTH_S + ) # ~400 bytes per 50ms self._stream_sid: str | None = None self._audio_buffer: bytearray = bytearray() @@ -60,17 +62,23 @@ def __init__(self, twilio_websocket: WebSocket): # Playback tracking for outbound audio self._mark_counter = 0 - self._mark_data: dict[str, tuple[str, int, int]] = {} # mark_id -> (item_id, content_index, byte_count) + self._mark_data: dict[str, tuple[str, int, int]] = ( + {} + ) # mark_id -> (item_id, content_index, byte_count) # ---- Deterministic startup warm-up (preferred over sleep) ---- # Buffer the first N chunks before sending to OpenAI; then mark warmed. try: - self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3"))) + self.STARTUP_BUFFER_CHUNKS = max( + 0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3")) + ) except Exception: self.STARTUP_BUFFER_CHUNKS = 3 self._startup_buffer = bytearray() - self._startup_warmed = self.STARTUP_BUFFER_CHUNKS == 0 # if 0, considered warmed immediately + self._startup_warmed = ( + self.STARTUP_BUFFER_CHUNKS == 0 + ) # if 0, considered warmed immediately # Optional delay (defaults 0.0 because buffering is preferred) try: @@ -235,8 +243,12 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None: if mark_id in self._mark_data: item_id, item_content_index, byte_count = self._mark_data[mark_id] audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker - self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes) - print(f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes") + self.playback_tracker.on_play_bytes( + item_id, item_content_index, audio_bytes + ) + print( + f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes" + ) del self._mark_data[mark_id] except Exception as e: @@ -257,7 +269,9 @@ async def _flush_audio_buffer(self) -> None: self._startup_buffer.extend(buffer_data) # target bytes = N chunks * bytes-per-chunk - target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS) + target_bytes = self.BUFFER_SIZE_BYTES * max( + 0, self.STARTUP_BUFFER_CHUNKS + ) if len(self._startup_buffer) >= target_bytes: # Warm-up complete: flush all buffered data in order @@ -284,7 +298,8 @@ async def _buffer_flush_loop(self) -> None: current_time = time.time() if ( self._audio_buffer - and current_time - self._last_buffer_send_time > self.CHUNK_LENGTH_S * 2 + and current_time - self._last_buffer_send_time + > self.CHUNK_LENGTH_S * 2 ): await self._flush_audio_buffer() From 6ee7b88235764361059f1c8cd0d6849b1039502f Mon Sep 17 00:00:00 2001 From: Shraman Hazra Date: Wed, 5 Nov 2025 22:55:00 +0530 Subject: [PATCH 4/4] style: fix format issues in twilio_handler.py --- examples/realtime/twilio/twilio_handler.py | 25 +++++++--------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 18aa244c6..30b75451f 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -52,9 +52,7 @@ def __init__(self, twilio_websocket: WebSocket): # Audio chunking (matches CLI demo) self.CHUNK_LENGTH_S = 0.05 # 50ms chunks self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz - self.BUFFER_SIZE_BYTES = int( - self.SAMPLE_RATE * self.CHUNK_LENGTH_S - ) # ~400 bytes per 50ms + self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms self._stream_sid: str | None = None self._audio_buffer: bytearray = bytearray() @@ -62,16 +60,14 @@ def __init__(self, twilio_websocket: WebSocket): # Playback tracking for outbound audio self._mark_counter = 0 - self._mark_data: dict[str, tuple[str, int, int]] = ( - {} - ) # mark_id -> (item_id, content_index, byte_count) + self._mark_data: dict[ + str, tuple[str, int, int] + ] = {} # mark_id -> (item_id, content_index, byte_count) # ---- Deterministic startup warm-up (preferred over sleep) ---- # Buffer the first N chunks before sending to OpenAI; then mark warmed. try: - self.STARTUP_BUFFER_CHUNKS = max( - 0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3")) - ) + self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3"))) except Exception: self.STARTUP_BUFFER_CHUNKS = 3 @@ -243,9 +239,7 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None: if mark_id in self._mark_data: item_id, item_content_index, byte_count = self._mark_data[mark_id] audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker - self.playback_tracker.on_play_bytes( - item_id, item_content_index, audio_bytes - ) + self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes) print( f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes" ) @@ -269,9 +263,7 @@ async def _flush_audio_buffer(self) -> None: self._startup_buffer.extend(buffer_data) # target bytes = N chunks * bytes-per-chunk - target_bytes = self.BUFFER_SIZE_BYTES * max( - 0, self.STARTUP_BUFFER_CHUNKS - ) + target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS) if len(self._startup_buffer) >= target_bytes: # Warm-up complete: flush all buffered data in order @@ -298,8 +290,7 @@ async def _buffer_flush_loop(self) -> None: current_time = time.time() if ( self._audio_buffer - and current_time - self._last_buffer_send_time - > self.CHUNK_LENGTH_S * 2 + and current_time - self._last_buffer_send_time > self.CHUNK_LENGTH_S * 2 ): await self._flush_audio_buffer()