Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions packages/agents-extensions/src/TwilioRealtimeTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ export type TwilioRealtimeTransportLayerOptions =
* connection gets passed into your request handler when running your WebSocket server.
*/
twilioWebSocket: WebSocket | NodeWebSocket;
// Optional resampler hooks. They can be async or sync.
// data: ArrayBuffer audio payload, from: input audio format label, to: target audio format label
resampleIncoming?: (
data: ArrayBuffer,
from?: string,
to?: string,
) => Promise<ArrayBuffer> | ArrayBuffer;
resampleOutgoing?: (
data: ArrayBuffer,
from?: string,
to?: string,
) => Promise<ArrayBuffer> | ArrayBuffer;
};

/**
Expand Down Expand Up @@ -60,10 +72,24 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
#lastPlayedChunkCount: number = 0;
#previousItemId: string | null = null;
#logger = getLogger('openai-agents:extensions:twilio');
#resampleIncoming?: (
data: ArrayBuffer,
from?: string,
to?: string,
) => Promise<ArrayBuffer> | ArrayBuffer;
#resampleOutgoing?: (
data: ArrayBuffer,
from?: string,
to?: string,
) => Promise<ArrayBuffer> | ArrayBuffer;
// audio format expected by Twilio (default g711_ulaw)
#twilioAudioFormat: string = 'g711_ulaw';

constructor(options: TwilioRealtimeTransportLayerOptions) {
super(options);
this.#twilioWebSocket = options.twilioWebSocket;
this.#resampleIncoming = options.resampleIncoming;
this.#resampleOutgoing = options.resampleOutgoing;
}

_setInputAndOutputAudioFormat(
Expand Down Expand Up @@ -91,10 +117,17 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
options.initialSessionConfig = this._setInputAndOutputAudioFormat(
options.initialSessionConfig,
);

// Keep the transport's twilioAudioFormat in sync with initial session config
// (outputAudioFormat is what we will send to Twilio)
this.#twilioAudioFormat =
// @ts-expect-error - this is a valid config
options.initialSessionConfig?.outputAudioFormat ?? 'g711_ulaw';

// listen to Twilio messages as quickly as possible
this.#twilioWebSocket.addEventListener(
'message',
(message: MessageEvent | NodeMessageEvent) => {
async (message: MessageEvent | NodeMessageEvent) => {
try {
const data = JSON.parse(message.data.toString());
if (this.#logger.dontLogModelData) {
Expand All @@ -109,7 +142,30 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
switch (data.event) {
case 'media':
if (this.status === 'connected') {
this.sendAudio(utils.base64ToArrayBuffer(data.media.payload));
// Twilio sends base64 payloads
let buffer = utils.base64ToArrayBuffer(data.media.payload);

// If user supplied a resampler, call it to convert to the internal Realtime expected format
if (this.#resampleIncoming) {
try {
const maybePromise = this.#resampleIncoming(
buffer,
// Twilio payload format (we assume Twilio->transport input)
data.media?.format ?? undefined,
// target format we used for inputAudioFormat
// (we infer from initialSessionConfig or default to g711_ulaw)
// @ts-expect-error - this is a valid config
options.initialSessionConfig?.inputAudioFormat ??
'g711_ulaw',
);
buffer = (await maybePromise) ?? buffer;
} catch (err) {
this.#logger.error('Incoming resampling failed:', err);
// fall back to original buffer
}
}

this.sendAudio(buffer);
}
break;
case 'mark':
Expand Down Expand Up @@ -199,15 +255,35 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
super._interrupt(elapsedTime, cancelOngoingResponse);
}

protected _onAudio(audioEvent: TransportLayerAudio) {
protected async _onAudio(audioEvent: TransportLayerAudio) {
this.#logger.debug(
`Sending audio to Twilio ${audioEvent.responseId}: (${audioEvent.data.byteLength} bytes)`,
);
// Allow user-provided resampler to convert outgoing Realtime audio to Twilio format.
let twilioPayloadBuffer: ArrayBuffer = audioEvent.data;

if (this.#resampleOutgoing) {
try {
const maybePromise = this.#resampleOutgoing(
audioEvent.data,
// from: Realtime internal audio format (unknown here), leave undefined
undefined,
// to: format Twilio expects for outgoing audio
this.#twilioAudioFormat,
);
twilioPayloadBuffer = (await maybePromise) ?? audioEvent.data;
} catch (err) {
this.#logger.error('Outgoing resampling failed:', err);
// fall back to original audioEvent.data
twilioPayloadBuffer = audioEvent.data;
}
}

const audioDelta = {
event: 'media',
streamSid: this.#streamSid,
media: {
payload: utils.arrayBufferToBase64(audioEvent.data),
payload: utils.arrayBufferToBase64(twilioPayloadBuffer),
},
};
if (this.#previousItemId !== this.currentItemId && this.currentItemId) {
Expand All @@ -228,3 +304,5 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
this.emit('audio', audioEvent);
}
}

// vim:ts=2 sw=2 et:
144 changes: 144 additions & 0 deletions packages/agents-extensions/test/TwilioResample.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { describe, test, expect, vi, beforeEach } from 'vitest';
import { EventEmitter } from 'events';
import { TwilioRealtimeTransportLayer } from '../src/TwilioRealtimeTransport';

import type { MessageEvent as NodeMessageEvent } from 'ws';
import type { MessageEvent } from 'undici-types';

// Mock the realtime package like other tests do so we can observe sendAudio, etc.
vi.mock('@openai/agents/realtime', () => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const { EventEmitter } = require('events');
const utils = {
base64ToArrayBuffer: (b64: string) =>
Uint8Array.from(Buffer.from(b64, 'base64')).buffer,
arrayBufferToBase64: (buf: ArrayBuffer) =>
Buffer.from(new Uint8Array(buf)).toString('base64'),
};
class FakeOpenAIRealtimeWebSocket extends EventEmitter {
status: 'connected' | 'disconnected' = 'disconnected';
currentItemId: string | null = null;
}
FakeOpenAIRealtimeWebSocket.prototype.connect = vi.fn(async function (
this: any,
) {
this.status = 'connected';
});
FakeOpenAIRealtimeWebSocket.prototype.sendAudio = vi.fn();
FakeOpenAIRealtimeWebSocket.prototype.close = vi.fn();
FakeOpenAIRealtimeWebSocket.prototype._interrupt = vi.fn();
FakeOpenAIRealtimeWebSocket.prototype.updateSessionConfig = vi.fn();
return { OpenAIRealtimeWebSocket: FakeOpenAIRealtimeWebSocket, utils };
});

class FakeTwilioWebSocket extends EventEmitter {
send = vi.fn();
close = vi.fn();
}

// @ts-expect-error - make the node EventEmitter compatible with the browser style used in the transport
FakeTwilioWebSocket.prototype.addEventListener = function (
type: string,
listener: (evt: MessageEvent | NodeMessageEvent) => void,
) {
// When the transport registers addEventListener('message', ...) it expects the listener
// to receive an object with a `.data` that responds to toString(). Tests below emit the
// raw payload as the event argument and this wrapper synthesizes { data: evt }.
this.on(type, (evt) => listener(type === 'message' ? { data: evt } : evt));
};

describe('TwilioRealtimeTransportLayer resampling hooks', () => {
beforeEach(() => {
vi.clearAllMocks();
});

test('resampleIncoming is called and its result is passed to sendAudio', async () => {
const resampleIncoming = vi.fn(
async (data: ArrayBuffer, _from?: string, _to?: string) => {
// ensure we receive the original data (we won't assert exact bytes here, just that the hook was called)
return data;
},
);

const twilio = new FakeTwilioWebSocket();
const transport = new TwilioRealtimeTransportLayer({
twilioWebSocket: twilio as any,
resampleIncoming,
});

// connect the transport (mocks will set the OpenAI websocket to connected)
await transport.connect({ apiKey: 'ek_test' } as any);

// Grab the mocked OpenAIRealtimeWebSocket prototype to assert sendAudio was called with our resampled buffer
const { OpenAIRealtimeWebSocket } = await import('@openai/agents/realtime');
const sendAudioSpy = vi.mocked(OpenAIRealtimeWebSocket.prototype.sendAudio);

// Prepare a Twilio 'media' message (base64-encoded payload). Use small bytes.
const originalBytes = Buffer.from([1, 2, 3]);
const payloadB64 = originalBytes.toString('base64');
const twilioMessage = {
event: 'media',
streamSid: 'FAKE',
media: { payload: payloadB64 },
};

// Emit the message (the FakeTwilioWebSocket addEventListener wrapper will provide { data: evt })
twilio.emit('message', { toString: () => JSON.stringify(twilioMessage) });

// wait a tick for async handler to run
await Promise.resolve();

// resampleIncoming should have been called
expect(resampleIncoming).toHaveBeenCalled();
// sendAudio should have been called with the resampled buffer
expect(sendAudioSpy).toHaveBeenCalled();
const calledArg = sendAudioSpy.mock.calls[0][0] as ArrayBuffer;
expect(Array.from(new Uint8Array(calledArg))).toEqual(
Array.from(new Uint8Array(originalBytes)),
);
});

test('resampleOutgoing is called and Twilio receives its result', async () => {
const resampleOutgoing = vi.fn(
async (data: ArrayBuffer, _from?: string, _to?: string) => {
return data;
},
);

const twilio = new FakeTwilioWebSocket();
const transport = new TwilioRealtimeTransportLayer({
twilioWebSocket: twilio as any,
resampleOutgoing,
});

await transport.connect({ apiKey: 'ek_test' } as any);

// set a currentItemId so the transport resets chunk count and emits marks like real usage
// @ts-expect-error - we're setting a protected field for test
transport.currentItemId = 'test-item';

// Call the protected _onAudio to simulate outgoing audio from OpenAI -> Twilio
const outgoingBuffer = new Uint8Array([10, 11, 12]).buffer;
await transport['_onAudio']({
responseId: 'FAKE_ID',
type: 'audio',
data: outgoingBuffer,
});

// twilio.send should have been called at least twice (media and mark). Inspect the first call (media)
const sendCalls = vi.mocked(twilio.send).mock.calls;
expect(sendCalls.length).toBeGreaterThanOrEqual(1);

const firstArg = sendCalls[0][0] as string;
const parsed = JSON.parse(firstArg);
expect(parsed.event).toBe('media');
// verify media.payload decodes to the resampled bytes
const decoded = Buffer.from(parsed.media.payload, 'base64');
expect(Array.from(decoded)).toEqual(
Array.from(new Uint8Array(outgoingBuffer)),
);

// ensure the outgoing resampler was called
expect(resampleOutgoing).toHaveBeenCalled();
});
});