Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions examples/realtime/twilio/twilio_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def get_current_time() -> str:

agent = RealtimeAgent(
name="Twilio Assistant",
instructions="You are a helpful assistant that starts every conversation with a creative greeting. Keep responses concise and friendly since this is a phone conversation.",
instructions=(
"You are a helpful assistant that starts every conversation with a creative greeting. "
"Keep responses concise and friendly since this is a phone conversation."
),
tools=[get_weather, get_current_time],
)

Expand All @@ -46,21 +49,39 @@ def __init__(self, twilio_websocket: WebSocket):
self.session: RealtimeSession | None = None
self.playback_tracker = RealtimePlaybackTracker()

# Audio buffering configuration (matching CLI demo)
self.CHUNK_LENGTH_S = 0.05 # 50ms chunks like CLI demo
self.SAMPLE_RATE = 8000 # Twilio uses 8kHz for g711_ulaw
self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio
# Audio chunking (matches CLI demo)
self.CHUNK_LENGTH_S = 0.05 # 50ms chunks
self.SAMPLE_RATE = 8000 # Twilio g711_ulaw at 8kHz
self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # ~400 bytes per 50ms

self._stream_sid: str | None = None
self._audio_buffer: bytearray = bytearray()
self._last_buffer_send_time = time.time()

# Mark event tracking for playback
# Playback tracking for outbound audio
self._mark_counter = 0
self._mark_data: dict[
str, tuple[str, int, int]
] = {} # mark_id -> (item_id, content_index, byte_count)

# ---- Deterministic startup warm-up (preferred over sleep) ----
# Buffer the first N chunks before sending to OpenAI; then mark warmed.
try:
self.STARTUP_BUFFER_CHUNKS = max(0, int(os.getenv("TWILIO_STARTUP_BUFFER_CHUNKS", "3")))
except Exception:
self.STARTUP_BUFFER_CHUNKS = 3

self._startup_buffer = bytearray()
self._startup_warmed = (
self.STARTUP_BUFFER_CHUNKS == 0
) # if 0, considered warmed immediately

# Optional delay (defaults 0.0 because buffering is preferred)
try:
self.STARTUP_DELAY_S = float(os.getenv("TWILIO_STARTUP_DELAY_S", "0.0"))
except Exception:
self.STARTUP_DELAY_S = 0.0

async def start(self) -> None:
"""Start the session."""
runner = RealtimeRunner(agent)
Expand Down Expand Up @@ -89,6 +110,11 @@ async def start(self) -> None:
await self.twilio_websocket.accept()
print("Twilio WebSocket connection accepted")

# Optional tiny delay (kept configurable; default 0.0)
if self.STARTUP_DELAY_S > 0:
await asyncio.sleep(self.STARTUP_DELAY_S)

# Start loops after handshake
self._realtime_session_task = asyncio.create_task(self._realtime_session_loop())
self._message_loop_task = asyncio.create_task(self._twilio_message_loop())
self._buffer_flush_task = asyncio.create_task(self._buffer_flush_loop())
Expand Down Expand Up @@ -197,7 +223,7 @@ async def _handle_media_event(self, message: dict[str, Any]) -> None:
# Add original µ-law to buffer for OpenAI (they expect µ-law)
self._audio_buffer.extend(ulaw_bytes)

# Send buffered audio if we have enough data
# Send buffered audio if we have enough data for one chunk
if len(self._audio_buffer) >= self.BUFFER_SIZE_BYTES:
await self._flush_audio_buffer()

Expand All @@ -210,47 +236,55 @@ async def _handle_mark_event(self, message: dict[str, Any]) -> None:
mark_data = message.get("mark", {})
mark_id = mark_data.get("name", "")

# Look up stored data for this mark ID
if mark_id in self._mark_data:
item_id, item_content_index, byte_count = self._mark_data[mark_id]

# Convert byte count back to bytes for playback tracker
audio_bytes = b"\x00" * byte_count # Placeholder bytes

# Update playback tracker
audio_bytes = b"\x00" * byte_count # Placeholder bytes for tracker
self.playback_tracker.on_play_bytes(item_id, item_content_index, audio_bytes)
print(
f"Playback tracker updated: {item_id}, index {item_content_index}, {byte_count} bytes"
)

# Clean up the stored data
del self._mark_data[mark_id]

except Exception as e:
print(f"Error handling mark event: {e}")

async def _flush_audio_buffer(self) -> None:
"""Send buffered audio to OpenAI."""
"""Send buffered audio to OpenAI with deterministic startup warm-up."""
if not self._audio_buffer or not self.session:
return

try:
# Send the buffered audio
buffer_data = bytes(self._audio_buffer)
await self.session.send_audio(buffer_data)

# Clear the buffer
self._audio_buffer.clear()
self._last_buffer_send_time = time.time()

# During startup, accumulate first N chunks before sending anything
if not self._startup_warmed:
self._startup_buffer.extend(buffer_data)

# target bytes = N chunks * bytes-per-chunk
target_bytes = self.BUFFER_SIZE_BYTES * max(0, self.STARTUP_BUFFER_CHUNKS)

if len(self._startup_buffer) >= target_bytes:
# Warm-up complete: flush all buffered data in order
await self.session.send_audio(bytes(self._startup_buffer))
self._startup_buffer.clear()
self._startup_warmed = True
else:
# Not enough yet; keep buffering and return
return
else:
# Already warmed: send immediately
await self.session.send_audio(buffer_data)

except Exception as e:
print(f"Error sending buffered audio to OpenAI: {e}")

async def _buffer_flush_loop(self) -> None:
"""Periodically flush audio buffer to prevent stale data."""
try:
while True:
await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms
await asyncio.sleep(self.CHUNK_LENGTH_S) # check every 50ms

# If buffer has data and it's been too long since last send, flush it
current_time = time.time()
Expand Down