diff --git a/packages/agents-extensions/src/TwilioRealtimeTransport.ts b/packages/agents-extensions/src/TwilioRealtimeTransport.ts index f63d9464..e59c2788 100644 --- a/packages/agents-extensions/src/TwilioRealtimeTransport.ts +++ b/packages/agents-extensions/src/TwilioRealtimeTransport.ts @@ -25,6 +25,18 @@ export type TwilioRealtimeTransportLayerOptions = * connection gets passed into your request handler when running your WebSocket server. */ twilioWebSocket: WebSocket | NodeWebSocket; + // Optional resampler hooks. They can be async or sync. + // data: ArrayBuffer audio payload, from: input audio format label, to: target audio format label + resampleIncoming?: ( + data: ArrayBuffer, + from?: string, + to?: string, + ) => Promise | ArrayBuffer; + resampleOutgoing?: ( + data: ArrayBuffer, + from?: string, + to?: string, + ) => Promise | ArrayBuffer; }; /** @@ -60,10 +72,24 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket { #lastPlayedChunkCount: number = 0; #previousItemId: string | null = null; #logger = getLogger('openai-agents:extensions:twilio'); + #resampleIncoming?: ( + data: ArrayBuffer, + from?: string, + to?: string, + ) => Promise | ArrayBuffer; + #resampleOutgoing?: ( + data: ArrayBuffer, + from?: string, + to?: string, + ) => Promise | ArrayBuffer; + // audio format expected by Twilio (default g711_ulaw) + #twilioAudioFormat: string = 'g711_ulaw'; constructor(options: TwilioRealtimeTransportLayerOptions) { super(options); this.#twilioWebSocket = options.twilioWebSocket; + this.#resampleIncoming = options.resampleIncoming; + this.#resampleOutgoing = options.resampleOutgoing; } _setInputAndOutputAudioFormat( @@ -91,10 +117,17 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket { options.initialSessionConfig = this._setInputAndOutputAudioFormat( options.initialSessionConfig, ); + + // Keep the transport's twilioAudioFormat in sync with initial session config + // (outputAudioFormat is what we will send to Twilio) + this.#twilioAudioFormat = + // @ts-expect-error - this is a valid config + options.initialSessionConfig?.outputAudioFormat ?? 'g711_ulaw'; + // listen to Twilio messages as quickly as possible this.#twilioWebSocket.addEventListener( 'message', - (message: MessageEvent | NodeMessageEvent) => { + async (message: MessageEvent | NodeMessageEvent) => { try { const data = JSON.parse(message.data.toString()); if (this.#logger.dontLogModelData) { @@ -109,7 +142,30 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket { switch (data.event) { case 'media': if (this.status === 'connected') { - this.sendAudio(utils.base64ToArrayBuffer(data.media.payload)); + // Twilio sends base64 payloads + let buffer = utils.base64ToArrayBuffer(data.media.payload); + + // If user supplied a resampler, call it to convert to the internal Realtime expected format + if (this.#resampleIncoming) { + try { + const maybePromise = this.#resampleIncoming( + buffer, + // Twilio payload format (we assume Twilio->transport input) + data.media?.format ?? undefined, + // target format we used for inputAudioFormat + // (we infer from initialSessionConfig or default to g711_ulaw) + // @ts-expect-error - this is a valid config + options.initialSessionConfig?.inputAudioFormat ?? + 'g711_ulaw', + ); + buffer = (await maybePromise) ?? buffer; + } catch (err) { + this.#logger.error('Incoming resampling failed:', err); + // fall back to original buffer + } + } + + this.sendAudio(buffer); } break; case 'mark': @@ -199,15 +255,35 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket { super._interrupt(elapsedTime, cancelOngoingResponse); } - protected _onAudio(audioEvent: TransportLayerAudio) { + protected async _onAudio(audioEvent: TransportLayerAudio) { this.#logger.debug( `Sending audio to Twilio ${audioEvent.responseId}: (${audioEvent.data.byteLength} bytes)`, ); + // Allow user-provided resampler to convert outgoing Realtime audio to Twilio format. + let twilioPayloadBuffer: ArrayBuffer = audioEvent.data; + + if (this.#resampleOutgoing) { + try { + const maybePromise = this.#resampleOutgoing( + audioEvent.data, + // from: Realtime internal audio format (unknown here), leave undefined + undefined, + // to: format Twilio expects for outgoing audio + this.#twilioAudioFormat, + ); + twilioPayloadBuffer = (await maybePromise) ?? audioEvent.data; + } catch (err) { + this.#logger.error('Outgoing resampling failed:', err); + // fall back to original audioEvent.data + twilioPayloadBuffer = audioEvent.data; + } + } + const audioDelta = { event: 'media', streamSid: this.#streamSid, media: { - payload: utils.arrayBufferToBase64(audioEvent.data), + payload: utils.arrayBufferToBase64(twilioPayloadBuffer), }, }; if (this.#previousItemId !== this.currentItemId && this.currentItemId) { @@ -228,3 +304,5 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket { this.emit('audio', audioEvent); } } + +// vim:ts=2 sw=2 et: diff --git a/packages/agents-extensions/test/TwilioResample.test.ts b/packages/agents-extensions/test/TwilioResample.test.ts new file mode 100644 index 00000000..6d453a29 --- /dev/null +++ b/packages/agents-extensions/test/TwilioResample.test.ts @@ -0,0 +1,144 @@ +import { describe, test, expect, vi, beforeEach } from 'vitest'; +import { EventEmitter } from 'events'; +import { TwilioRealtimeTransportLayer } from '../src/TwilioRealtimeTransport'; + +import type { MessageEvent as NodeMessageEvent } from 'ws'; +import type { MessageEvent } from 'undici-types'; + +// Mock the realtime package like other tests do so we can observe sendAudio, etc. +vi.mock('@openai/agents/realtime', () => { + // eslint-disable-next-line @typescript-eslint/no-require-imports + const { EventEmitter } = require('events'); + const utils = { + base64ToArrayBuffer: (b64: string) => + Uint8Array.from(Buffer.from(b64, 'base64')).buffer, + arrayBufferToBase64: (buf: ArrayBuffer) => + Buffer.from(new Uint8Array(buf)).toString('base64'), + }; + class FakeOpenAIRealtimeWebSocket extends EventEmitter { + status: 'connected' | 'disconnected' = 'disconnected'; + currentItemId: string | null = null; + } + FakeOpenAIRealtimeWebSocket.prototype.connect = vi.fn(async function ( + this: any, + ) { + this.status = 'connected'; + }); + FakeOpenAIRealtimeWebSocket.prototype.sendAudio = vi.fn(); + FakeOpenAIRealtimeWebSocket.prototype.close = vi.fn(); + FakeOpenAIRealtimeWebSocket.prototype._interrupt = vi.fn(); + FakeOpenAIRealtimeWebSocket.prototype.updateSessionConfig = vi.fn(); + return { OpenAIRealtimeWebSocket: FakeOpenAIRealtimeWebSocket, utils }; +}); + +class FakeTwilioWebSocket extends EventEmitter { + send = vi.fn(); + close = vi.fn(); +} + +// @ts-expect-error - make the node EventEmitter compatible with the browser style used in the transport +FakeTwilioWebSocket.prototype.addEventListener = function ( + type: string, + listener: (evt: MessageEvent | NodeMessageEvent) => void, +) { + // When the transport registers addEventListener('message', ...) it expects the listener + // to receive an object with a `.data` that responds to toString(). Tests below emit the + // raw payload as the event argument and this wrapper synthesizes { data: evt }. + this.on(type, (evt) => listener(type === 'message' ? { data: evt } : evt)); +}; + +describe('TwilioRealtimeTransportLayer resampling hooks', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + test('resampleIncoming is called and its result is passed to sendAudio', async () => { + const resampleIncoming = vi.fn( + async (data: ArrayBuffer, _from?: string, _to?: string) => { + // ensure we receive the original data (we won't assert exact bytes here, just that the hook was called) + return data; + }, + ); + + const twilio = new FakeTwilioWebSocket(); + const transport = new TwilioRealtimeTransportLayer({ + twilioWebSocket: twilio as any, + resampleIncoming, + }); + + // connect the transport (mocks will set the OpenAI websocket to connected) + await transport.connect({ apiKey: 'ek_test' } as any); + + // Grab the mocked OpenAIRealtimeWebSocket prototype to assert sendAudio was called with our resampled buffer + const { OpenAIRealtimeWebSocket } = await import('@openai/agents/realtime'); + const sendAudioSpy = vi.mocked(OpenAIRealtimeWebSocket.prototype.sendAudio); + + // Prepare a Twilio 'media' message (base64-encoded payload). Use small bytes. + const originalBytes = Buffer.from([1, 2, 3]); + const payloadB64 = originalBytes.toString('base64'); + const twilioMessage = { + event: 'media', + streamSid: 'FAKE', + media: { payload: payloadB64 }, + }; + + // Emit the message (the FakeTwilioWebSocket addEventListener wrapper will provide { data: evt }) + twilio.emit('message', { toString: () => JSON.stringify(twilioMessage) }); + + // wait a tick for async handler to run + await Promise.resolve(); + + // resampleIncoming should have been called + expect(resampleIncoming).toHaveBeenCalled(); + // sendAudio should have been called with the resampled buffer + expect(sendAudioSpy).toHaveBeenCalled(); + const calledArg = sendAudioSpy.mock.calls[0][0] as ArrayBuffer; + expect(Array.from(new Uint8Array(calledArg))).toEqual( + Array.from(new Uint8Array(originalBytes)), + ); + }); + + test('resampleOutgoing is called and Twilio receives its result', async () => { + const resampleOutgoing = vi.fn( + async (data: ArrayBuffer, _from?: string, _to?: string) => { + return data; + }, + ); + + const twilio = new FakeTwilioWebSocket(); + const transport = new TwilioRealtimeTransportLayer({ + twilioWebSocket: twilio as any, + resampleOutgoing, + }); + + await transport.connect({ apiKey: 'ek_test' } as any); + + // set a currentItemId so the transport resets chunk count and emits marks like real usage + // @ts-expect-error - we're setting a protected field for test + transport.currentItemId = 'test-item'; + + // Call the protected _onAudio to simulate outgoing audio from OpenAI -> Twilio + const outgoingBuffer = new Uint8Array([10, 11, 12]).buffer; + await transport['_onAudio']({ + responseId: 'FAKE_ID', + type: 'audio', + data: outgoingBuffer, + }); + + // twilio.send should have been called at least twice (media and mark). Inspect the first call (media) + const sendCalls = vi.mocked(twilio.send).mock.calls; + expect(sendCalls.length).toBeGreaterThanOrEqual(1); + + const firstArg = sendCalls[0][0] as string; + const parsed = JSON.parse(firstArg); + expect(parsed.event).toBe('media'); + // verify media.payload decodes to the resampled bytes + const decoded = Buffer.from(parsed.media.payload, 'base64'); + expect(Array.from(decoded)).toEqual( + Array.from(new Uint8Array(outgoingBuffer)), + ); + + // ensure the outgoing resampler was called + expect(resampleOutgoing).toHaveBeenCalled(); + }); +});