Skip to content

Commit 1952338

Browse files
committed
fix(twilio): deterministic startup warm-up by buffering first N chunks; default N=3
1 parent e6fa378 commit 1952338

File tree

1 file changed

+54
-43
lines changed

1 file changed

+54
-43
lines changed

examples/realtime/twilio/twilio_handler.py

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ def get_current_time() -> str:
3434

3535
agent = RealtimeAgent(
3636
name="Twilio Assistant",
37-
instructions="You are a helpful assistant that starts every conversation with a creative greeting. Keep responses concise and friendly since this is a phone conversation.",
37+
instructions=(
38+
"You are a helpful assistant that starts every conversation with a creative greeting. "
39+
"Keep responses concise and friendly since this is a phone conversation."
40+
),
3841
tools=[get_weather, get_current_time],
3942
)
4043

@@ -46,20 +49,34 @@ def __init__(self, twilio_websocket: WebSocket):
4649
self.session: RealtimeSession | None = None
4750
self.playback_tracker = RealtimePlaybackTracker()
4851

49-
# Audio buffering configuration (matching CLI demo)
50-
self.CHUNK_LENGTH_S = 0.05 # 50ms chunks like CLI demo
51-
self.SAMPLE_RATE = 8000 # Twilio uses 8kHz for g711_ulaw
52-
self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio
52+
# Audio chunking (matches CLI demo)
53+
self.CHUNK_LENGTH_S = 0.05 # 50ms chunks
54+
self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz
55+
self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms
5356

5457
self._stream_sid: str | None = None
5558
self._audio_buffer: bytearray = bytearray()
5659
self._last_buffer_send_time = time.time()
5760

58-
# Mark event tracking for playback
61+
# Playback tracking for outbound audio
5962
self._mark_counter = 0
60-
self._mark_data: dict[
61-
str, tuple[str, int, int]
62-
] = {} # mark_id -> (item_id, content_index, byte_count)
63+
self._mark_data: dict[str, tuple[str, int, int]] = {} # mark_id -> (item_id, content_index, byte_count)
64+
65+
# ---- Deterministic startup warm-up (preferred over sleep) ----
66+
# Buffer the first N chunks before sending to OpenAI; then mark warmed.
67+
try:
68+
self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3")))
69+
except Exception:
70+
self.STARTUP_BUFFER_CHUNKS = 3
71+
72+
self._startup_buffer = bytearray()
73+
self._startup_warmed = self.STARTUP_BUFFER_CHUNKS == 0 # if 0, considered warmed immediately
74+
75+
# Optional delay (defaults 0.0 because buffering is preferred)
76+
try:
77+
self.STARTUP_DELAY_S = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.0"))
78+
except Exception:
79+
self.STARTUP_DELAY_S = 0.0
6380

6481
async def start(self) -> None:
6582
"""Start the session."""
@@ -89,23 +106,11 @@ async def start(self) -> None:
89106
await self.twilio_websocket.accept()
90107
print("Twilio WebSocket connection accepted")
91108

92-
# ---------------------------
93-
# STARTUP DELAY (fix jitter)
94-
# ---------------------------
95-
# Small configurable delay to allow websockets/handshakes and buffers
96-
# to settle before audio starts. Default is 0.5 seconds.
97-
try:
98-
startup_delay = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.5"))
99-
except Exception:
100-
startup_delay = 0.5
101-
102-
# Only perform the sleep if a positive value is configured
103-
if startup_delay > 0:
104-
# allow other coroutines to run while waiting
105-
await asyncio.sleep(startup_delay)
106-
# ---------------------------
109+
# Optional tiny delay (kept configurable; default 0.0)
110+
if self.STARTUP_DELAY_S > 0:
111+
await asyncio.sleep(self.STARTUP_DELAY_S)
107112

108-
# create tasks after warmup so we avoid missing the first audio frames
113+
# Start loops after handshake
109114
self._realtime_session_task = asyncio.create_task(self._realtime_session_loop())
110115
self._message_loop_task = asyncio.create_task(self._twilio_message_loop())
111116
self._buffer_flush_task = asyncio.create_task(self._buffer_flush_loop())
@@ -214,7 +219,7 @@ async def _handle_media_event(self, message: dict[str, Any]) -> None:
214219
# Add original µ-law to buffer for OpenAI (they expect µ-law)
215220
self._audio_buffer.extend(ulaw_bytes)
216221

217-
# Send buffered audio if we have enough data
222+
# Send buffered audio if we have enough data for one chunk
218223
if len(self._audio_buffer) >= self.BUFFER_SIZE_BYTES:
219224
await self._flush_audio_buffer()
220225

@@ -227,47 +232,53 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None:
227232
mark_data = message.get("mark", {})
228233
mark_id = mark_data.get("name", "")
229234

230-
# Look up stored data for this mark ID
231235
if mark_id in self._mark_data:
232236
item_id, item_content_index, byte_count = self._mark_data[mark_id]
233-
234-
# Convert byte count back to bytes for playback tracker
235-
audio_bytes = b"\x00" * byte_count # Placeholder bytes
236-
237-
# Update playback tracker
237+
audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker
238238
self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes)
239-
print(
240-
f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes"
241-
)
242-
243-
# Clean up the stored data
239+
print(f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes")
244240
del self._mark_data[mark_id]
245241

246242
except Exception as e:
247243
print(f"Error handling mark event: {e}")
248244

249245
async def _flush_audio_buffer(self) -> None:
250-
"""Send buffered audio to OpenAI."""
246+
"""Send buffered audio to OpenAI with deterministic startup warm-up."""
251247
if not self._audio_buffer or not self.session:
252248
return
253249

254250
try:
255-
# Send the buffered audio
256251
buffer_data = bytes(self._audio_buffer)
257-
await self.session.send_audio(buffer_data)
258-
259-
# Clear the buffer
260252
self._audio_buffer.clear()
261253
self._last_buffer_send_time = time.time()
262254

255+
# During startup, accumulate first N chunks before sending anything
256+
if not self._startup_warmed:
257+
self._startup_buffer.extend(buffer_data)
258+
259+
# target bytes = N chunks * bytes-per-chunk
260+
target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS)
261+
262+
if len(self._startup_buffer) >= target_bytes:
263+
# Warm-up complete: flush all buffered data in order
264+
await self.session.send_audio(bytes(self._startup_buffer))
265+
self._startup_buffer.clear()
266+
self._startup_warmed = True
267+
else:
268+
# Not enough yet; keep buffering and return
269+
return
270+
else:
271+
# Already warmed: send immediately
272+
await self.session.send_audio(buffer_data)
273+
263274
except Exception as e:
264275
print(f"Error sending buffered audio to OpenAI: {e}")
265276

266277
async def _buffer_flush_loop(self) -> None:
267278
"""Periodically flush audio buffer to prevent stale data."""
268279
try:
269280
while True:
270-
await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms
281+
await asyncio.sleep(self.CHUNK_LENGTH_S) # check every 50ms
271282

272283
# If buffer has data and it's been too long since last send, flush it
273284
current_time = time.time()

0 commit comments

Comments
 (0)