diff --git a/.changeset/early-numbers-build.md b/.changeset/early-numbers-build.md new file mode 100644 index 0000000000..2876f3fed2 --- /dev/null +++ b/.changeset/early-numbers-build.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Typesafe error propagation in signal connection path diff --git a/eslint.config.mjs b/eslint.config.mjs index 246c84842a..5c061396e6 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -2,6 +2,7 @@ import js from '@eslint/js'; import { configs, plugins, rules } from 'eslint-config-airbnb-extended'; import { rules as prettierConfigRules } from 'eslint-config-prettier'; +import neverthrowMustUse from 'eslint-plugin-neverthrow-must-use'; import prettierPlugin from 'eslint-plugin-prettier'; const strictness = 'off'; @@ -31,6 +32,15 @@ const typescriptConfig = [ rules.typescript.typescriptEslintStrict, ]; +const neverthrowConfig = [ + { + name: 'neverthrow-must-use', + plugins: { + 'neverthrow-must-use': neverthrowMustUse, + }, + }, +]; + const prettierConfig = [ // Prettier Plugin { @@ -56,6 +66,7 @@ export default [ ...typescriptConfig, // Prettier Config ...prettierConfig, + ...neverthrowConfig, { languageOptions: { parserOptions: { @@ -158,6 +169,7 @@ export default [ 'one-var': strictness, 'no-multi-assign': strictness, 'new-cap': strictness, + 'require-yield': strictness, radix: strictness, eqeqeq: strictness, diff --git a/package.json b/package.json index 3fa231d8fb..b089fe9274 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "events": "^3.3.0", "jose": "^6.1.0", "loglevel": "^1.9.2", + "neverthrow": "^8.2.0", "sdp-transform": "^2.15.0", "ts-debounce": "^4.0.0", "tslib": "2.8.1", @@ -96,6 +97,7 @@ "eslint-config-prettier": "10.1.8", "eslint-plugin-compat": "^6.0.2", "eslint-plugin-import-x": "^4.16.1", + "eslint-plugin-neverthrow-must-use": "^0.1.2", "eslint-plugin-prettier": "^5.5.4", "gh-pages": "6.3.0", "happy-dom": "^17.2.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c4ec70fc34..ce99d24059 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,6 +26,9 @@ importers: loglevel: specifier: ^1.9.2 version: 1.9.2 + neverthrow: + specifier: ^8.2.0 + version: 8.2.0 sdp-transform: specifier: ^2.15.0 version: 2.15.0 @@ -120,6 +123,9 @@ importers: eslint-plugin-import-x: specifier: ^4.16.1 version: 4.16.1(@typescript-eslint/utils@8.47.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint-import-resolver-node@0.3.9)(eslint@9.39.1(jiti@2.4.2)) + eslint-plugin-neverthrow-must-use: + specifier: ^0.1.2 + version: 0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2)) eslint-plugin-prettier: specifier: ^5.5.4 version: 5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2) @@ -2270,6 +2276,13 @@ packages: peerDependencies: eslint: '>=8.23.0' + eslint-plugin-neverthrow-must-use@0.1.2: + resolution: {integrity: sha512-Wt/u1wjnH8rWtbc8zqTK5yOcB79zVlCCWXi6ChJNer5ACkqldNQ6/+RKVUErACbv0Oex9aqaKYoTd0OqLe4o3Q==} + engines: {node: '>=16'} + peerDependencies: + '@typescript-eslint/parser': ^8.0.0 + eslint: ^9.0.0 + eslint-plugin-prettier@5.5.4: resolution: {integrity: sha512-swNtI95SToIz05YINMA6Ox5R057IMAmWZ26GqPxusAp1TZzj+IdY9tXNWWD3vkF/wEqydCONcwjTFpxybBqZsg==} engines: {node: ^14.18.0 || >=16.0.0} @@ -2982,6 +2995,10 @@ packages: neo-async@2.6.2: resolution: {integrity: sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==} + neverthrow@8.2.0: + resolution: {integrity: sha512-kOCT/1MCPAxY5iUV3wytNFUMUolzuwd/VF/1KCx7kf6CutrOsTie+84zTGTpgQycjvfLdBBdvBvFLqFD2c0wkQ==} + engines: {node: '>=18'} + node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -3624,8 +3641,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - typescript@6.0.0-dev.20251120: - resolution: {integrity: sha512-dkvZw2/09r7JIltGCeubJXLYE7+NapbKj68BtGtm47TiwjyKxTDTG2nWZu8Gpopzi0ub9bNVn0rEgh5CgOlE4w==} + typescript@6.0.0-dev.20251121: + resolution: {integrity: sha512-TrGhGS4hOAKgwizhMuH/3pbTNNBMCpxRA7ia8Lrv4HRMOAOzI5lWhP5uoKRDmmaF3pUVe90MBYjSieM498zUqQ==} engines: {node: '>=14.17'} hasBin: true @@ -6023,7 +6040,7 @@ snapshots: dependencies: semver: 7.6.0 shelljs: 0.8.5 - typescript: 6.0.0-dev.20251120 + typescript: 6.0.0-dev.20251121 dunder-proto@1.0.1: dependencies: @@ -6331,6 +6348,11 @@ snapshots: - typescript optional: true + eslint-plugin-neverthrow-must-use@0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2)): + dependencies: + '@typescript-eslint/parser': 7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3) + eslint: 9.39.1(jiti@2.4.2) + eslint-plugin-prettier@5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2): dependencies: eslint: 9.39.1(jiti@2.4.2) @@ -7088,6 +7110,10 @@ snapshots: neo-async@2.6.2: {} + neverthrow@8.2.0: + optionalDependencies: + '@rollup/rollup-linux-x64-gnu': 4.53.2 + node-fetch@2.7.0: dependencies: whatwg-url: 5.0.0 @@ -7789,7 +7815,7 @@ snapshots: typescript@5.8.3: {} - typescript@6.0.0-dev.20251120: {} + typescript@6.0.0-dev.20251121: {} uc.micro@2.1.0: {} diff --git a/src/api/SignalClient.test.ts b/src/api/SignalClient.test.ts index 0359a50295..ffec63ce39 100644 --- a/src/api/SignalClient.test.ts +++ b/src/api/SignalClient.test.ts @@ -5,11 +5,12 @@ import { SignalRequest, SignalResponse, } from '@livekit/protocol'; +import { Result, ResultAsync } from 'neverthrow'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { ConnectionError, ConnectionErrorReason } from '../room/errors'; -import { SignalClient, SignalConnectionState } from './SignalClient'; +import { SignalClient, SignalConnectionState, type ValidationType } from './SignalClient'; import type { WebSocketCloseInfo, WebSocketConnection } from './WebSocketStream'; -import { WebSocketStream } from './WebSocketStream'; +import { WebSocketError, WebSocketStream } from './WebSocketStream'; // Mock the WebSocketStream vi.mock('./WebSocketStream'); @@ -57,16 +58,27 @@ function createMockConnection(readable: ReadableStream): WebSocketC interface MockWebSocketStreamOptions { connection?: WebSocketConnection; - opened?: Promise; - closed?: Promise; + opened?: ResultAsync, WebSocketError>; + closed?: ResultAsync; readyState?: number; } function mockWebSocketStream(options: MockWebSocketStreamOptions = {}) { const { connection, - opened = connection ? Promise.resolve(connection) : new Promise(() => {}), - closed = new Promise(() => {}), + // eslint-disable-next-line neverthrow-must-use/must-use-result + opened = connection + ? ResultAsync.fromPromise(Promise.resolve(connection), (error) => ({ + type: 'connection' as const, + error: error as Event, + })) + : // eslint-disable-next-line neverthrow-must-use/must-use-result + ResultAsync.fromPromise(new Promise(() => {}), (error) => ({ + type: 'connection' as const, + error: error as Event, + })), + // eslint-disable-next-line neverthrow-must-use/must-use-result + closed = ResultAsync.fromPromise(new Promise(() => {}), (error) => error as WebSocketError), readyState = 1, } = options; @@ -109,7 +121,7 @@ describe('SignalClient.connect', () => { const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); - expect(result).toEqual(joinResponse); + expect(result._unsafeUnwrap()).toEqual(joinResponse); expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED); }); }); @@ -138,7 +150,7 @@ describe('SignalClient.connect', () => { const result = await signalClient.reconnect('wss://test.livekit.io', 'test-token', 'sid-123'); - expect(result).toEqual(reconnectResponse); + expect(result._unsafeUnwrap()).toEqual(reconnectResponse); expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED); }); @@ -163,7 +175,7 @@ describe('SignalClient.connect', () => { const result = await signalClient.reconnect('wss://test.livekit.io', 'test-token', 'sid-123'); // This is an edge case: reconnect resolves with undefined when non-reconnect message is received - expect(result).toBeUndefined(); + expect(result._unsafeUnwrap()).toBeUndefined(); expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED); }, 1000); }); @@ -177,9 +189,14 @@ describe('SignalClient.connect', () => { websocketTimeout: 100, }; - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', shortTimeoutOptions), - ).rejects.toThrow(ConnectionError); + const result = await signalClient.join( + 'wss://test.livekit.io', + 'test-token', + shortTimeoutOptions, + ); + + expect(result.isErr()).toBe(true); + expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError); }); }); @@ -191,23 +208,35 @@ describe('SignalClient.connect', () => { // Simulate abort setTimeout(() => abortController.abort(new Error('User aborted connection')), 50); + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise(new Promise(() => {}), (error) => ({ + type: 'connection' as const, + error: error as Event, + })); + // eslint-disable-next-line neverthrow-must-use/must-use-result + const closed = ResultAsync.fromPromise( + new Promise(() => {}), + (error) => error as WebSocketError, + ); + return { url: 'wss://test.livekit.io', - opened: new Promise(() => {}), // Never resolves - closed: new Promise(() => {}), + opened, + closed, close: vi.fn(), readyState: 0, } as any; }); - await expect( - signalClient.join( - 'wss://test.livekit.io', - 'test-token', - defaultOptions, - abortController.signal, - ), - ).rejects.toThrow('User aborted connection'); + const result = await signalClient.join( + 'wss://test.livekit.io', + 'test-token', + defaultOptions, + abortController.signal, + ); + + expect(result.isErr()).toBe(true); + expect(result._unsafeUnwrapErr().message).toBe('AbortSignal invoked'); }); it('should send leave request before closing when AbortSignal is triggered during connection', async () => { @@ -249,10 +278,21 @@ describe('SignalClient.connect', () => { }; vi.mocked(WebSocketStream).mockImplementation(() => { + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise(Promise.resolve(mockConnection), (error) => ({ + type: 'connection' as const, + error: error as Event, + })); + // eslint-disable-next-line neverthrow-must-use/must-use-result + const closed = ResultAsync.fromPromise( + new Promise(() => {}), + (error) => error as WebSocketError, + ); + return { url: 'wss://test.livekit.io', - opened: Promise.resolve(mockConnection), - closed: new Promise(() => {}), + opened, + closed, close: vi.fn(), readyState: 1, } as any; @@ -270,10 +310,12 @@ describe('SignalClient.connect', () => { await streamWriterReadyPromise; // Now abort the connection (after WS opens, before join response) - abortController.abort(new Error('User aborted connection')); + abortController.abort(); - // joinPromise should reject - await expect(joinPromise).rejects.toThrow('User aborted connection'); + // joinPromise should return Err result + const result = await joinPromise; + expect(result.isErr()).toBe(true); + expect(result._unsafeUnwrapErr().message).toBe('AbortSignal invoked'); // Verify that a leave request was sent before closing const leaveRequestSent = writtenMessages.some((data) => { @@ -296,8 +338,13 @@ describe('SignalClient.connect', () => { describe('Failure Case - WebSocket Connection Errors', () => { it('should reject with NotAllowed error for 4xx HTTP status', async () => { + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise( + Promise.reject(ConnectionError.websocket('Connection failed')), + (error) => error as WebSocketError, + ); mockWebSocketStream({ - opened: Promise.reject(new Error('Connection failed')), + opened, readyState: 3, }); @@ -307,54 +354,85 @@ describe('SignalClient.connect', () => { text: async () => 'Forbidden', }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - message: 'Forbidden', - reason: ConnectionErrorReason.NotAllowed, - status: 403, - }); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect(error.message).toBe('Forbidden'); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.NotAllowed); + expect((error as ConnectionError).status).toBe(403); }); it('should reject with ServerUnreachable when fetch fails', async () => { + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise( + Promise.reject(ConnectionError.websocket('Connection failed')), + (error) => error as WebSocketError, + ); mockWebSocketStream({ - opened: Promise.reject(new Error('Connection failed')), + opened, readyState: 3, }); // Mock fetch to throw (network error) (global.fetch as any).mockRejectedValueOnce(new Error('Network error')); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - reason: ConnectionErrorReason.ServerUnreachable, - }); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.ServerUnreachable); }); - it('should handle ConnectionError from WebSocket rejection', async () => { - const customError = new ConnectionError( - 'Custom error', - ConnectionErrorReason.InternalError, - 500, + it('should handle WebsocketError from WebSocket rejection as unreachable if server is not reachable', async () => { + const customError = ConnectionError.websocket('Custom error'); + + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise( + Promise.reject(customError), + (error) => error as WebSocketError, ); + mockWebSocketStream({ + opened, + readyState: 3, + }); + + (global.fetch as any).mockRejectedValueOnce(new Error('Network error')); + + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.ServerUnreachable); + }); + it('should handle WebsocketError from WebSocket rejection as websocket error if server is reachable', async () => { + const customError = ConnectionError.websocket('Custom error'); + + // eslint-disable-next-line neverthrow-must-use/must-use-result + const opened = ResultAsync.fromPromise( + Promise.reject(customError), + (error) => error as WebSocketError, + ); mockWebSocketStream({ - opened: Promise.reject(customError), + opened, readyState: 3, }); - // Mock fetch to return 500 + // Mock fetch to return 200 (global.fetch as any).mockResolvedValueOnce({ - status: 500, - text: async () => 'Internal Server Error', + status: 200, + text: async () => 'testplaceholder', }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - reason: ConnectionErrorReason.InternalError, - }); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.WebSocket); }); }); @@ -370,12 +448,13 @@ describe('SignalClient.connect', () => { mockWebSocketStream({ connection: mockConnection }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - message: 'no message received as first message', - reason: ConnectionErrorReason.InternalError, - }); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect(error.message).toBe('no message received as first message'); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.InternalError); }); }); @@ -390,16 +469,14 @@ describe('SignalClient.connect', () => { mockWebSocketStream({ connection: mockConnection }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject( - new ConnectionError( - 'Received leave request while trying to (re)connect', - ConnectionErrorReason.LeaveRequest, - undefined, - 1, - ), - ); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect(error.message).toBe('Received leave request while trying to (re)connect'); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.LeaveRequest); + expect((error as ConnectionError).context).toBe(1); }); }); @@ -415,43 +492,41 @@ describe('SignalClient.connect', () => { mockWebSocketStream({ connection: mockConnection }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - message: 'did not receive join response, got reconnect instead', - reason: ConnectionErrorReason.InternalError, - }); + const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect(error.message).toBe('did not receive join response, got reconnect instead'); + expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.InternalError); }); }); describe('Failure Case - WebSocket Closed During Connection', () => { it('should reject when WebSocket closes during connection attempt', async () => { - let closedResolve: (value: WebSocketCloseInfo) => void; - const closedPromise = new Promise((resolve) => { - closedResolve = resolve; - }); + mockWebSocketStream({ readyState: 3 }); // CLOSED - vi.mocked(WebSocketStream).mockImplementation(() => { - // Simulate close during connection - queueMicrotask(() => { - closedResolve({ closeCode: 1006, reason: 'Connection lost' }); - }); + const shortTimeoutOptions = { + ...defaultOptions, + websocketTimeout: 100, + }; - return { - url: 'wss://test.livekit.io', - opened: new Promise(() => {}), // Never resolves - closed: closedPromise, - close: vi.fn(), - readyState: 2, // CLOSING - } as any; + // Mock fetch to return 200 + (global.fetch as any).mockResolvedValueOnce({ + status: 200, + text: async () => 'testplaceholder', }); - await expect( - signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions), - ).rejects.toMatchObject({ - message: 'Websocket got closed during a (re)connection attempt: Connection lost', - reason: ConnectionErrorReason.InternalError, - }); + const result = await signalClient.join( + 'wss://test.livekit.io', + 'test-token', + shortTimeoutOptions, + ); + + expect(result.isErr()).toBe(true); + const error = result._unsafeUnwrapErr(); + expect(error).toBeInstanceOf(ConnectionError); + expect(error.reason).toBe(ConnectionErrorReason.WebSocket); }); }); @@ -587,11 +662,13 @@ describe('SignalClient.validateFirstMessage', () => { const joinResponse = createJoinResponse(); const signalResponse = createSignalResponse('join', joinResponse); - const validateMethod = (signalClient as any).validateFirstMessage; + const validateMethod = (signalClient as any).validateFirstMessage as ( + msg: any, + isReconnect: boolean, + ) => Result; if (validateMethod) { const result = validateMethod.call(signalClient, signalResponse, false); - expect(result.isValid).toBe(true); - expect(result.response).toEqual(joinResponse); + expect(result._unsafeUnwrap().response).toEqual(joinResponse); } }); @@ -611,11 +688,13 @@ describe('SignalClient.validateFirstMessage', () => { const reconnectResponse = new ReconnectResponse({ iceServers: [] }); const signalResponse = createSignalResponse('reconnect', reconnectResponse); - const validateMethod = (signalClient as any).validateFirstMessage; + const validateMethod = (signalClient as any).validateFirstMessage as ( + msg: any, + isReconnect: boolean, + ) => Result; if (validateMethod) { const result = validateMethod.call(signalClient, signalResponse, true); - expect(result.isValid).toBe(true); - expect(result.response).toEqual(reconnectResponse); + expect(result._unsafeUnwrap().response).toEqual(reconnectResponse); } }); @@ -634,12 +713,14 @@ describe('SignalClient.validateFirstMessage', () => { const updateSignalResponse = createSignalResponse('update', { participants: [] }); - const validateMethod = (signalClient as any).validateFirstMessage; + const validateMethod = (signalClient as any).validateFirstMessage as ( + msg: any, + isReconnect: boolean, + ) => Result; if (validateMethod) { const result = validateMethod.call(signalClient, updateSignalResponse, true); - expect(result.isValid).toBe(true); - expect(result.response).toBeUndefined(); - expect(result.shouldProcessFirstMessage).toBe(true); + expect(result._unsafeUnwrap().response).toBeUndefined(); + expect(result._unsafeUnwrap().shouldProcessFirstMessage).toBe(true); } }); @@ -650,12 +731,14 @@ describe('SignalClient.validateFirstMessage', () => { const leaveRequest = new LeaveRequest({ reason: 1 }); const signalResponse = createSignalResponse('leave', leaveRequest); - const validateMethod = (signalClient as any).validateFirstMessage; + const validateMethod = (signalClient as any).validateFirstMessage as ( + msg: any, + isReconnect: boolean, + ) => Result; if (validateMethod) { const result = validateMethod.call(signalClient, signalResponse, false); - expect(result.isValid).toBe(false); - expect(result.error).toBeInstanceOf(ConnectionError); - expect(result.error?.reason).toBe(ConnectionErrorReason.LeaveRequest); + expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError); + expect(result._unsafeUnwrapErr().reason).toBe(ConnectionErrorReason.LeaveRequest); } }); @@ -663,12 +746,14 @@ describe('SignalClient.validateFirstMessage', () => { const reconnectResponse = new ReconnectResponse({ iceServers: [] }); const signalResponse = createSignalResponse('reconnect', reconnectResponse); - const validateMethod = (signalClient as any).validateFirstMessage; + const validateMethod = (signalClient as any).validateFirstMessage as ( + msg: any, + isReconnect: boolean, + ) => Result; if (validateMethod) { const result = validateMethod.call(signalClient, signalResponse, false); - expect(result.isValid).toBe(false); - expect(result.error).toBeInstanceOf(ConnectionError); - expect(result.error?.reason).toBe(ConnectionErrorReason.InternalError); + expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError); + expect(result._unsafeUnwrapErr().reason).toBe(ConnectionErrorReason.InternalError); } }); }); @@ -692,19 +777,17 @@ describe('SignalClient.handleConnectionError', () => { const error = new Error('Connection failed'); const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate'); - expect(result).toBeInstanceOf(ConnectionError); - expect(result.reason).toBe(ConnectionErrorReason.NotAllowed); - expect(result.status).toBe(403); - expect(result.message).toBe('Forbidden'); + expect(result.isErr()).toBe(true); + const err = result._unsafeUnwrapErr(); + expect(err).toBeInstanceOf(ConnectionError); + expect(err.reason).toBe(ConnectionErrorReason.NotAllowed); + expect(err.status).toBe(403); + expect(err.message).toBe('Forbidden'); } }); it('should return ConnectionError as-is if it is already a ConnectionError', async () => { - const connectionError = new ConnectionError( - 'Custom error', - ConnectionErrorReason.InternalError, - 500, - ); + const connectionError = ConnectionError.internal('Custom error'); (global.fetch as any).mockResolvedValueOnce({ status: 500, @@ -719,8 +802,10 @@ describe('SignalClient.handleConnectionError', () => { 'wss://test.livekit.io/validate', ); - expect(result).toBe(connectionError); - expect(result.reason).toBe(ConnectionErrorReason.InternalError); + expect(result.isErr()).toBe(true); + const err = result._unsafeUnwrapErr(); + expect(err).toBe(connectionError); + expect(err.reason).toBe(ConnectionErrorReason.InternalError); } }); @@ -735,9 +820,11 @@ describe('SignalClient.handleConnectionError', () => { const error = new Error('Connection failed'); const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate'); - expect(result).toBeInstanceOf(ConnectionError); - expect(result.reason).toBe(ConnectionErrorReason.InternalError); - expect(result.status).toBe(500); + expect(result.isErr()).toBe(true); + const err = result._unsafeUnwrapErr(); + expect(err).toBeInstanceOf(ConnectionError); + expect(err.reason).toBe(ConnectionErrorReason.InternalError); + expect(err.status).toBe(500); } }); @@ -749,13 +836,15 @@ describe('SignalClient.handleConnectionError', () => { const error = new Error('Connection failed'); const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate'); - expect(result).toBeInstanceOf(ConnectionError); - expect(result.reason).toBe(ConnectionErrorReason.ServerUnreachable); + expect(result.isErr()).toBe(true); + const err = result._unsafeUnwrapErr(); + expect(err).toBeInstanceOf(ConnectionError); + expect(err.reason).toBe(ConnectionErrorReason.ServerUnreachable); } }); it('should handle fetch throwing ConnectionError', async () => { - const fetchError = new ConnectionError('Fetch failed', ConnectionErrorReason.ServerUnreachable); + const fetchError = ConnectionError.serverUnreachable('Fetch failed'); (global.fetch as any).mockRejectedValueOnce(fetchError); const handleMethod = (signalClient as any).handleConnectionError; @@ -763,7 +852,8 @@ describe('SignalClient.handleConnectionError', () => { const error = new Error('Connection failed'); const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate'); - expect(result).toBe(fetchError); + expect(result.isErr()).toBe(true); + expect(result._unsafeUnwrapErr()).toBe(fetchError); } }); }); diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index abf44e563a..43a355431f 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -44,6 +44,7 @@ import { WrappedJoinRequest, protoInt64, } from '@livekit/protocol'; +import { Result, ResultAsync, err, errAsync, ok, okAsync, safeTry } from 'neverthrow'; import log, { LoggerNames, getLogger } from '../logger'; import { ConnectionError, ConnectionErrorReason } from '../room/errors'; import CriticalTimers from '../room/timers'; @@ -54,14 +55,15 @@ import { type WebSocketConnection, WebSocketStream } from './WebSocketStream'; import { createRtcUrl, createValidateUrl, - getAbortReasonAsString, parseSignalResponse, + raceResults, + withAbort, + withMutex, + withTimeout, } from './utils'; // internal options interface ConnectOpts extends SignalOptions { - /** internal */ - reconnect?: boolean; /** internal */ reconnectReason?: number; /** internal */ @@ -85,7 +87,6 @@ type SignalKind = NonNullable['case']; const passThroughQueueSignals: Array = [ 'syncState', 'trickle', - 'offer', 'answer', 'simulate', 'leave', @@ -241,229 +242,174 @@ export class SignalClient { return this.loggerContextCb?.() ?? {}; } - async join( - url: string, - token: string, - opts: SignalOptions, - abortSignal?: AbortSignal, - ): Promise { + async join(url: string, token: string, opts: SignalOptions, abortSignal?: AbortSignal) { // during a full reconnect, we'd want to start the sequence even if currently // connected this.state = SignalConnectionState.CONNECTING; this.options = opts; - const res = await this.connect(url, token, opts, abortSignal); - return res as JoinResponse; + return this.connect(url, token, false, opts, abortSignal); } - async reconnect( - url: string, - token: string, - sid?: string, - reason?: ReconnectReason, - ): Promise { + reconnect(url: string, token: string, sid?: string, reason?: ReconnectReason) { if (!this.options) { - this.log.warn( - 'attempted to reconnect without signal options being set, ignoring', - this.logContext, + return errAsync( + ConnectionError.internal('attempted to reconnect without signal options being set'), ); - return; } this.state = SignalConnectionState.RECONNECTING; // clear ping interval and restart it once reconnected this.clearPingInterval(); - const res = (await this.connect(url, token, { + return this.connect(url, token, true, { ...this.options, - reconnect: true, sid, reconnectReason: reason, - })) as ReconnectResponse | undefined; - return res; + }); } - private async connect( - url: string, - token: string, - opts: ConnectOpts, - abortSignal?: AbortSignal, - ): Promise { - const unlock = await this.connectionLock.lock(); - - this.connectOptions = opts; - const clientInfo = getClientInfo(); - const params = opts.singlePeerConnection - ? createJoinRequestConnectionParams(token, clientInfo, opts) - : createConnectionParams(token, clientInfo, opts); - const rtcUrl = createRtcUrl(url, params); - const validateUrl = createValidateUrl(rtcUrl); - - return new Promise(async (resolve, reject) => { - try { - let alreadyAborted = false; - const abortHandler = async (eventOrError: Event | Error) => { - if (alreadyAborted) { - return; - } - alreadyAborted = true; - const target = eventOrError instanceof Event ? eventOrError.currentTarget : eventOrError; - const reason = getAbortReasonAsString(target, 'Abort handler called'); - // send leave if we have an active stream writer (connection is open) - if (this.streamWriter && !this.isDisconnected) { - this.sendLeave() - .then(() => this.close(reason)) - .catch((e) => { - this.log.error(e); - this.close(); - }); - } else { - this.close(); - } - cleanupAbortHandlers(); - reject(target instanceof AbortSignal ? target.reason : target); - }; - - abortSignal?.addEventListener('abort', abortHandler); - - const cleanupAbortHandlers = () => { - clearTimeout(wsTimeout); - abortSignal?.removeEventListener('abort', abortHandler); - }; - - const wsTimeout = setTimeout(() => { - abortHandler( - new ConnectionError( - 'room connection has timed out (signal)', - ConnectionErrorReason.ServerUnreachable, - ), - ); - }, opts.websocketTimeout); + private connect< + T extends boolean, + U extends T extends false ? JoinResponse : ReconnectResponse | undefined, + >(url: string, token: string, isReconnect: T, opts: ConnectOpts, abortSignal?: AbortSignal) { + const self = this; - const handleSignalConnected = ( - connection: WebSocketConnection, - firstMessage?: SignalResponse, - ) => { - this.handleSignalConnected(connection, wsTimeout, firstMessage); - }; + return withMutex( + safeTry(async function* () { + self.connectOptions = opts; + + const clientInfo = getClientInfo(); + const params = opts.singlePeerConnection + ? createJoinRequestConnectionParams(token, clientInfo, opts, isReconnect) + : createConnectionParams(token, clientInfo, opts, isReconnect); + const rtcUrl = createRtcUrl(url, params); + const validateUrl = createValidateUrl(rtcUrl); const redactedUrl = new URL(rtcUrl); if (redactedUrl.searchParams.has('access_token')) { redactedUrl.searchParams.set('access_token', ''); } - this.log.debug(`connecting to ${redactedUrl}`, { - reconnect: opts.reconnect, + self.log.debug(`connecting to ${redactedUrl}`, { + reconnect: isReconnect, reconnectReason: opts.reconnectReason, - ...this.logContext, + ...self.logContext, }); - if (this.ws) { - await this.close(false); + + if (self.ws) { + await self.close( + false, + opts?.reconnectReason ? ReconnectReason[opts.reconnectReason] : undefined, + ); } - this.ws = new WebSocketStream(rtcUrl); - - try { - this.ws.closed - .then((closeInfo) => { - if (this.isEstablishingConnection) { - reject( - new ConnectionError( - `Websocket got closed during a (re)connection attempt: ${closeInfo.reason}`, - ConnectionErrorReason.InternalError, - ), - ); + + const ws = new WebSocketStream(rtcUrl); + self.ws = ws; + + const wsConnectionResult = withTimeout(ws.opened, opts.websocketTimeout).mapErr( + async (error) => { + // retrieve info about what error was causing the connection failure and enhance the returned error + if (self.state !== SignalConnectionState.CONNECTED) { + self.state = SignalConnectionState.DISCONNECTED; + const connectionError = await withAbort( + withTimeout(self.fetchErrorInfo(error.message, validateUrl), 3_000), + abortSignal, + ); + + const closeReason = `${error.reason}: ${error.message}`; + + self.close(undefined, closeReason); + if (connectionError.isErr()) { + return connectionError.error; } - if (closeInfo.closeCode !== 1000) { - this.log.warn(`websocket closed`, { - ...this.logContext, + } + return error; + }, + ); + + const wsConnection = yield* withAbort( + wsConnectionResult.andTee((connection) => { + self.streamWriter = connection.writable.getWriter(); + }), + abortSignal, + ).orTee((error) => { + self.close(undefined, error.message); + }); + + const firstMessageOrClose = raceResults([ + self.processInitialSignalMessage(wsConnection), + // Return the close promise as error if it resolves first + ws.closed + .orTee((error) => { + self.handleWSError(error); + }) + .andThen((closeInfo) => { + if ( + // we only log the warning here if the current ws connection is still the same, we don't care about closing of older ws connections that have been replaced + ws === self.ws + ) { + self.log.warn(`websocket closed`, { + ...self.logContext, reason: closeInfo.reason, code: closeInfo.closeCode, wasClean: closeInfo.closeCode === 1000, - state: this.state, + state: self.state, }); - if (this.state === SignalConnectionState.CONNECTED) { - this.handleOnClose(closeInfo.reason ?? 'Unexpected WS error'); + if (self.state == SignalConnectionState.CONNECTED) { + self.handleOnClose(closeInfo.reason ?? 'Websocket closed unexpectedly'); + } else { + self.log.warn( + `ws closed unexpectedly in state ${SignalConnectionState[self.state]}`, + ); } } - return; - }) - .catch((reason) => { - if (this.isEstablishingConnection) { - reject( - new ConnectionError( - `Websocket error during a (re)connection attempt: ${reason}`, - ConnectionErrorReason.InternalError, - ), - ); - } - }); - const connection = await this.ws.opened.catch(async (reason: unknown) => { - if (this.state !== SignalConnectionState.CONNECTED) { - this.state = SignalConnectionState.DISCONNECTED; - clearTimeout(wsTimeout); - const error = await this.handleConnectionError(reason, validateUrl); - reject(error); - return; - } - // other errors, handle - this.handleWSError(reason); - reject(reason); - return; - }); - clearTimeout(wsTimeout); - if (!connection) { - return; - } - const signalReader = connection.readable.getReader(); - this.streamWriter = connection.writable.getWriter(); - const firstMessage = await signalReader.read(); - signalReader.releaseLock(); - if (!firstMessage.value) { - throw new ConnectionError( - 'no message received as first message', - ConnectionErrorReason.InternalError, - ); - } - - const firstSignalResponse = parseSignalResponse(firstMessage.value); - // Validate the first message - const validation = this.validateFirstMessage( - firstSignalResponse, - opts.reconnect ?? false, - ); - - if (!validation.isValid) { - reject(validation.error); - return; - } - - // Handle join response - set up ping configuration - if (firstSignalResponse.message?.case === 'join') { - this.pingTimeoutDuration = firstSignalResponse.message.value.pingTimeout; - this.pingIntervalDuration = firstSignalResponse.message.value.pingInterval; - - if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) { - this.log.debug('ping config', { - ...this.logContext, - timeout: this.pingTimeoutDuration, - interval: this.pingIntervalDuration, + return err( + ConnectionError.internal( + closeInfo.reason ?? 'Websocket closed during (re)connection attempt', + ), + ); + }), + ]); + + const firstSignalResponse = yield* await withAbort( + withTimeout(firstMessageOrClose, 5_000), + abortSignal, + ).orTee((error) => { + self.log.warn('signal connection failed', error); + if (error.reason === ConnectionErrorReason.Cancelled) { + self + .sendLeave() + .then(() => self.close()) + .catch((e) => { + self.log.error(e); + self.close(); }); - } } + }); - // Handle successful connection - const firstMessageToProcess = validation.shouldProcessFirstMessage - ? firstSignalResponse - : undefined; - handleSignalConnected(connection, firstMessageToProcess); - resolve(validation.response); - } catch (e) { - reject(e); - } finally { - cleanupAbortHandlers(); + const validation = yield* self.validateFirstMessage(firstSignalResponse, isReconnect); + + // Handle join response - set up ping configuration + if (firstSignalResponse.message?.case === 'join') { + self.pingTimeoutDuration = firstSignalResponse.message.value.pingTimeout; + self.pingIntervalDuration = firstSignalResponse.message.value.pingInterval; + if (self.pingTimeoutDuration && self.pingTimeoutDuration > 0) { + self.log.debug('ping config', { + ...self.logContext, + timeout: self.pingTimeoutDuration, + interval: self.pingIntervalDuration, + }); + } } - } finally { - unlock(); - } - }); + + self.handleSignalConnected( + wsConnection, + validation.shouldProcessFirstMessage ? firstSignalResponse : undefined, + ); + + return ok(validation.response as U); + }), + this.connectionLock, + ); } async startReadingLoop( @@ -512,24 +458,31 @@ export class SignalClient { return; } const unlock = await this.closingLock.lock(); + try { this.clearPingInterval(); if (updateState) { this.state = SignalConnectionState.DISCONNECTING; } if (this.ws) { - this.ws.close({ closeCode: 1000, reason }); - - // calling `ws.close()` only starts the closing handshake (CLOSING state), prefer to wait until state is actually CLOSED - const closePromise = this.ws.closed; + const ws = this.ws; this.ws = undefined; this.streamWriter = undefined; + ws.close({ closeCode: 1000, reason }); + + // calling `ws.close()` only starts the closing handshake (CLOSING state), prefer to wait until state is actually CLOSED + const closePromise = ws.closed.match( + (closeInfo) => closeInfo, + (error) => error, + ); + await Promise.race([closePromise, sleep(MAX_WS_CLOSE_TIME)]); + this.log.info('closed websocket', { reason }); } } catch (e) { this.log.debug('websocket error while closing', { ...this.logContext, error: e }); } finally { - if (updateState) { + if (updateState && this.state === SignalConnectionState.DISCONNECTING) { this.state = SignalConnectionState.DISCONNECTED; } unlock(); @@ -538,8 +491,13 @@ export class SignalClient { // initial offer after joining sendOffer(offer: RTCSessionDescriptionInit, offerId: number) { - this.log.debug('sending offer', { ...this.logContext, offerSdp: offer.sdp }); - this.sendRequest({ + this.log.debug('sending offer', { + ...this.logContext, + offerSdp: offer.sdp, + state: SignalConnectionState[this.state], + wsState: this.ws?.readyState, + }); + return this.sendRequest({ case: 'offer', value: toProtoSessionDescription(offer, offerId), }); @@ -847,13 +805,13 @@ export class SignalClient { if (this.state === SignalConnectionState.DISCONNECTED) return; const onCloseCallback = this.onClose; await this.close(undefined, reason); - this.log.debug(`websocket connection closed: ${reason}`, { ...this.logContext, reason }); + this.log.debug(`websocket connection closing: ${reason}`, { ...this.logContext, reason }); if (onCloseCallback) { onCloseCallback(reason); } } - private handleWSError(error: unknown) { + private handleWSError(error: ReturnType) { this.log.error('websocket error', { ...this.logContext, error }); } @@ -915,17 +873,30 @@ export class SignalClient { * @param firstMessage Optional first message to process * @internal */ - private handleSignalConnected( - connection: WebSocketConnection, - timeoutHandle: ReturnType, - firstMessage?: SignalResponse, - ) { + private handleSignalConnected(connection: WebSocketConnection, firstMessage?: SignalResponse) { this.state = SignalConnectionState.CONNECTED; - clearTimeout(timeoutHandle); this.startPingInterval(); this.startReadingLoop(connection.readable.getReader(), firstMessage); } + private processInitialSignalMessage( + connection: WebSocketConnection, + ): ResultAsync { + // TODO: If inferring from the return type this could be more granular here than ConnectionError + return safeTry(async function* () { + const signalReader = connection.readable.getReader(); + + const firstMessage = await signalReader.read().finally(() => signalReader.releaseLock()); + if (!firstMessage.value) { + return err(ConnectionError.internal('no message received as first message')); + } + + const firstSignalResponse = parseSignalResponse(firstMessage.value); + + return okAsync(firstSignalResponse); + }); + } + /** * Validates the first message received from the signal server * @param firstSignalResponse The first signal response received @@ -936,63 +907,54 @@ export class SignalClient { private validateFirstMessage( firstSignalResponse: SignalResponse, isReconnect: boolean, - ): { - isValid: boolean; - response?: JoinResponse | ReconnectResponse; - error?: ConnectionError; - shouldProcessFirstMessage?: boolean; - } { - if (firstSignalResponse.message?.case === 'join') { - return { - isValid: true, + ): Result< + ValidationType, + // TODO, this should probably not be a ConnectionError? + ConnectionError + > { + if (isReconnect === false && firstSignalResponse.message?.case === 'join') { + return ok({ response: firstSignalResponse.message.value, - }; + shouldProcessFirstMessage: false, + }); } else if ( + isReconnect === true && this.state === SignalConnectionState.RECONNECTING && firstSignalResponse.message?.case !== 'leave' ) { if (firstSignalResponse.message?.case === 'reconnect') { - return { - isValid: true, + return ok({ response: firstSignalResponse.message.value, - }; + shouldProcessFirstMessage: false, + }); } else { // in reconnecting, any message received means signal reconnected and we still need to process it this.log.debug( 'declaring signal reconnected without reconnect response received', this.logContext, ); - return { - isValid: true, + return ok({ response: undefined, shouldProcessFirstMessage: true, - }; + }); } } else if (this.isEstablishingConnection && firstSignalResponse.message?.case === 'leave') { - return { - isValid: false, - error: new ConnectionError( + return err( + ConnectionError.leaveRequest( 'Received leave request while trying to (re)connect', - ConnectionErrorReason.LeaveRequest, - undefined, firstSignalResponse.message.value.reason, ), - }; + ); } else if (!isReconnect) { // non-reconnect case, should receive join response first - return { - isValid: false, - error: new ConnectionError( + + return err( + ConnectionError.internal( `did not receive join response, got ${firstSignalResponse.message?.case} instead`, - ConnectionErrorReason.InternalError, ), - }; + ); } - - return { - isValid: false, - error: new ConnectionError('Unexpected first message', ConnectionErrorReason.InternalError), - }; + return err(ConnectionError.internal('Unexpected first message')); } /** @@ -1002,31 +964,35 @@ export class SignalClient { * @returns A ConnectionError with appropriate reason and status * @internal */ - private async handleConnectionError( + private async fetchErrorInfo( reason: unknown, validateUrl: string, - ): Promise { + ): Promise> { try { const resp = await fetch(validateUrl); + if (resp.status.toFixed(0).startsWith('4')) { const msg = await resp.text(); - return new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status); + return err(ConnectionError.notAllowed(msg, resp.status)); } else if (reason instanceof ConnectionError) { - return reason; + return err(reason); } else { - return new ConnectionError( - `Encountered unknown websocket error during connection: ${reason}`, - ConnectionErrorReason.InternalError, - resp.status, + return err( + ConnectionError.websocket( + `Encountered unknown websocket error during connection: ${reason}`, + resp?.status, + resp?.statusText, + ), ); } } catch (e) { - return e instanceof ConnectionError - ? e - : new ConnectionError( - e instanceof Error ? e.message : 'server was not reachable', - ConnectionErrorReason.ServerUnreachable, - ); + return err( + e instanceof ConnectionError + ? e + : ConnectionError.serverUnreachable( + e instanceof Error ? `${e.name}: ${e.message}` : 'server was not reachable', + ), + ); } } } @@ -1065,12 +1031,13 @@ function createConnectionParams( token: string, info: ClientInfo, opts: ConnectOpts, + isReconnect: boolean, ): URLSearchParams { const params = new URLSearchParams(); params.set('access_token', token); // opts - if (opts.reconnect) { + if (isReconnect) { params.set('reconnect', '1'); if (opts.sid) { params.set('sid', opts.sid); @@ -1120,6 +1087,7 @@ function createJoinRequestConnectionParams( token: string, info: ClientInfo, opts: ConnectOpts, + isReconnect: boolean, ): URLSearchParams { const params = new URLSearchParams(); params.set('access_token', token); @@ -1130,7 +1098,7 @@ function createJoinRequestConnectionParams( autoSubscribe: !!opts.autoSubscribe, adaptiveStream: !!opts.adaptiveStream, }), - reconnect: !!opts.reconnect, + reconnect: isReconnect, participantSid: opts.sid ? opts.sid : undefined, }); if (opts.reconnectReason) { @@ -1143,3 +1111,8 @@ function createJoinRequestConnectionParams( return params; } + +export type ValidationType = + | { response: JoinResponse; shouldProcessFirstMessage: false } + | { response: ReconnectResponse; shouldProcessFirstMessage: false } + | { response: undefined; shouldProcessFirstMessage: true }; diff --git a/src/api/WebSocketStream.test.ts b/src/api/WebSocketStream.test.ts index 3445348042..32d1a5d5ad 100644 --- a/src/api/WebSocketStream.test.ts +++ b/src/api/WebSocketStream.test.ts @@ -1,5 +1,6 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ConnectionErrorReason } from '../room/errors'; import { WebSocketStream } from './WebSocketStream'; // Mock WebSocket @@ -122,6 +123,16 @@ vi.mock('../room/utils', () => ({ sleep: vi.fn((duration: number) => new Promise((resolve) => setTimeout(resolve, duration))), })); +// Helper function to unwrap Result from opened promise +async function getConnectionOrFail(wsStream: WebSocketStream) { + const result = await wsStream.opened; + expect(result.isOk()).toBe(true); + if (!result.isOk()) { + throw new Error('Failed to open connection'); + } + return result.value; +} + describe('WebSocketStream', () => { let mockWebSocket: MockWebSocket; let originalWebSocket: typeof WebSocket; @@ -174,7 +185,7 @@ describe('WebSocketStream', () => { new WebSocketStream('wss://test.example.com', { signal: abortController.signal, }); - }).toThrow('This operation was aborted'); + }).toThrow('Aborted before WS was initialized'); }); it('should close when abort signal is triggered', () => { @@ -201,21 +212,29 @@ describe('WebSocketStream', () => { const removeEventListenerSpy = vi.spyOn(mockWebSocket, 'removeEventListener'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; - - expect(connection.readable).toBeInstanceOf(ReadableStream); - expect(connection.writable).toBeInstanceOf(WritableStream); - expect(connection.protocol).toBe('test-protocol'); - expect(connection.extensions).toBe('test-extension'); + const result = await wsStream.opened; + + expect(result.isOk()).toBe(true); + if (result.isOk()) { + const connection = result.value; + expect(connection.readable).toBeInstanceOf(ReadableStream); + expect(connection.writable).toBeInstanceOf(WritableStream); + expect(connection.protocol).toBe('test-protocol'); + expect(connection.extensions).toBe('test-extension'); + } expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function)); }); - it('should reject when WebSocket errors before opening', async () => { + it('should return error Result when WebSocket errors before opening', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerError(); - await expect(wsStream.opened).rejects.toThrow(); + const result = await wsStream.opened; + expect(result.isErr()).toBe(true); + if (result.isErr()) { + expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket); + } }); }); @@ -227,10 +246,13 @@ describe('WebSocketStream', () => { mockWebSocket.triggerOpen(); mockWebSocket.triggerClose(1001, 'Going away'); - const closeInfo = await wsStream.closed; + const result = await wsStream.closed; - expect(closeInfo.closeCode).toBe(1001); - expect(closeInfo.reason).toBe('Going away'); + expect(result.isOk()).toBe(true); + if (result.isOk()) { + expect(result.value.closeCode).toBe(1001); + expect(result.value.reason).toBe('Going away'); + } expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function)); }); @@ -241,13 +263,16 @@ describe('WebSocketStream', () => { mockWebSocket.triggerError(); mockWebSocket.triggerClose(1006, 'Connection failed'); - const closeInfo = await wsStream.closed; + const result = await wsStream.closed; - expect(closeInfo.closeCode).toBe(1006); - expect(closeInfo.reason).toBe('Connection failed'); + expect(result.isOk()).toBe(true); + if (result.isOk()) { + expect(result.value.closeCode).toBe(1006); + expect(result.value.reason).toBe('Connection failed'); + } }); - it('should reject when error occurs without timely close event', async () => { + it('should return error Result when error occurs without timely close event', async () => { const { sleep } = await import('../room/utils'); vi.mocked(sleep).mockResolvedValue(undefined); @@ -256,9 +281,14 @@ describe('WebSocketStream', () => { mockWebSocket.triggerOpen(); mockWebSocket.triggerError(); - await expect(wsStream.closed).rejects.toThrow( - 'Encountered unspecified websocket error without a timely close event', - ); + const result = await wsStream.closed; + expect(result.isErr()).toBe(true); + if (result.isErr()) { + expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket); + expect(result.error.message).toBe( + 'Encountered unspecified websocket error without a timely close event', + ); + } }); }); @@ -267,8 +297,11 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const result = await wsStream.opened; + expect(result.isOk()).toBe(true); + if (!result.isOk()) return; + const connection = result.value; const reader = connection.readable.getReader(); const message1 = new ArrayBuffer(8); @@ -292,23 +325,22 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); mockWebSocket.triggerError(); - await Promise.all([ - expect(reader.read()).rejects.toBeDefined(), - expect(wsStream.closed).rejects.toBeDefined(), - ]); + const closedResult = await wsStream.closed; + await expect(reader.read()).rejects.toBeDefined(); + expect(closedResult.isErr()).toBe(true); }); it('should close WebSocket with custom close info when cancelled', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -322,7 +354,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader1 = connection.readable.getReader(); @@ -337,7 +369,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); const sendSpy = vi.spyOn(mockWebSocket, 'send'); @@ -362,7 +394,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -376,7 +408,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const writer = connection.writable.getWriter(); @@ -418,7 +450,7 @@ describe('WebSocketStream', () => { }); mockWebSocket.triggerOpen(); - await wsStream.opened; + await getConnectionOrFail(wsStream); const closeSpy = vi.spyOn(mockWebSocket, 'close'); @@ -433,7 +465,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); @@ -467,7 +499,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const writer = connection.writable.getWriter(); @@ -493,7 +525,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const sourceData = [new ArrayBuffer(8), new ArrayBuffer(16), new ArrayBuffer(32)]; let dataIndex = 0; @@ -524,7 +556,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const msg1 = new ArrayBuffer(8); const msg2 = new ArrayBuffer(16); @@ -552,7 +584,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); @@ -562,17 +594,16 @@ describe('WebSocketStream', () => { // Trigger error while read is pending mockWebSocket.triggerError(); - await Promise.all([ - expect(readPromise).rejects.toBeDefined(), - expect(wsStream.closed).rejects.toBeDefined(), - ]); + const closedResult = await wsStream.closed; + await expect(readPromise).rejects.toBeDefined(); + expect(closedResult.isErr()).toBe(true); }); it('should support zero-length and empty messages', async () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); const writer = connection.writable.getWriter(); @@ -599,7 +630,7 @@ describe('WebSocketStream', () => { const wsStream = new WebSocketStream('wss://test.example.com'); mockWebSocket.triggerOpen(); - const connection = await wsStream.opened; + const connection = await getConnectionOrFail(wsStream); const reader = connection.readable.getReader(); diff --git a/src/api/WebSocketStream.ts b/src/api/WebSocketStream.ts index d930c212b9..6e25c223f7 100644 --- a/src/api/WebSocketStream.ts +++ b/src/api/WebSocketStream.ts @@ -1,4 +1,6 @@ // https://github.com/CarterLi/websocketstream-polyfill +import { ResultAsync } from 'neverthrow'; +import { ConnectionError } from '../room/errors'; import { sleep } from '../room/utils'; export interface WebSocketConnection { @@ -18,6 +20,8 @@ export interface WebSocketStreamOptions { signal?: AbortSignal; } +export type WebSocketError = ReturnType; + /** * [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API) * @@ -26,11 +30,11 @@ export interface WebSocketStreamOptions { export class WebSocketStream { readonly url: string; - readonly opened: Promise>; + readonly opened: ResultAsync, WebSocketError>; - readonly closed: Promise; + readonly closed: ResultAsync; - readonly close: (closeInfo?: WebSocketCloseInfo) => void; + readonly close!: (closeInfo?: WebSocketCloseInfo) => void; get readyState(): number { return this.ws.readyState; @@ -40,77 +44,120 @@ export class WebSocketStream ws.close(code, reason); - this.opened = new Promise((resolve, reject) => { - ws.onopen = () => { - resolve({ - readable: new ReadableStream({ - start(controller) { - ws.onmessage = ({ data }) => controller.enqueue(data); - ws.onerror = (e) => controller.error(e); - }, - cancel: closeWithInfo, - }), - writable: new WritableStream({ - write(chunk) { - ws.send(chunk); - }, - abort() { - ws.close(); - }, - close: closeWithInfo, - }), - protocol: ws.protocol, - extensions: ws.extensions, - }); - ws.removeEventListener('error', reject); - }; - ws.addEventListener('error', reject); - }); - - this.closed = new Promise((resolve, reject) => { - const rejectHandler = async () => { - const closePromise = new Promise((res) => { - if (ws.readyState === WebSocket.CLOSED) return; - else { - ws.addEventListener( - 'close', - (closeEv: CloseEvent) => { - res(closeEv); + // eslint-disable-next-line neverthrow-must-use/must-use-result + this.opened = ResultAsync.fromPromise, WebSocketError>( + new Promise((resolve, r) => { + const reject = (err: WebSocketError) => r(err); + const errorHandler = (e: Event) => { + console.error(e); + reject( + ConnectionError.websocket('Encountered websocket error while establishing connection'), + ); + ws.removeEventListener('open', openHandler); + }; + + const onCloseDuringOpen = (ev: CloseEvent) => { + reject( + ConnectionError.websocket( + `WS closed during connection establishment: ${ev.reason}`, + ev.code, + ev.reason, + ), + ); + }; + + const openHandler = () => { + resolve({ + readable: new ReadableStream({ + start(controller) { + ws.onmessage = ({ data }) => controller.enqueue(data); + ws.onerror = (e) => controller.error(e); }, - { once: true }, + cancel: closeWithInfo, + }), + writable: new WritableStream({ + write(chunk) { + ws.send(chunk); + }, + abort() { + ws.close(); + }, + close: closeWithInfo, + }), + protocol: ws.protocol, + extensions: ws.extensions, + }); + ws.removeEventListener('error', errorHandler); + ws.removeEventListener('close', onCloseDuringOpen); + }; + + console.log('websocket setup registering event listeners'); + + ws.addEventListener('open', openHandler, { once: true }); + ws.addEventListener('error', errorHandler, { once: true }); + ws.addEventListener('close', onCloseDuringOpen, { once: true }); + }), + (error) => error as WebSocketError, + ); + + // eslint-disable-next-line neverthrow-must-use/must-use-result + this.closed = ResultAsync.fromPromise( + new Promise((resolve, r) => { + const reject = (err: WebSocketError) => r(err); + const errorHandler = async () => { + const closePromise = new Promise((res) => { + if (ws.readyState === WebSocket.CLOSED) return; + else { + ws.addEventListener( + 'close', + (closeEv: CloseEvent) => { + res(closeEv); + }, + { once: true }, + ); + } + }); + const reason = await Promise.race([sleep(250), closePromise]); + if (!reason) { + reject( + ConnectionError.websocket( + 'Encountered unspecified websocket error without a timely close event', + ), ); + } else { + // if we can infer the close reason from the close event then resolve with ok, we don't need to throw + resolve({ closeCode: reason.code, reason: reason.reason }); } - }); - const reason = await Promise.race([sleep(250), closePromise]); - if (!reason) { - reject(new Error('Encountered unspecified websocket error without a timely close event')); - } else { - // if we can infer the close reason from the close event then resolve the promise, we don't need to throw - resolve(reason); + }; + + if (ws.readyState === WebSocket.CLOSED) { + reject(ConnectionError.websocket('Websocket already closed at initialization time')); + return; } - }; - ws.onclose = ({ code, reason }) => { - resolve({ closeCode: code, reason }); - ws.removeEventListener('error', rejectHandler); - }; - ws.addEventListener('error', rejectHandler); - }); + ws.onclose = ({ code, reason }) => { + resolve({ closeCode: code, reason }); + ws.removeEventListener('error', errorHandler); + }; + + ws.addEventListener('error', errorHandler); + }), + (error) => error as WebSocketError, + ); if (options.signal) { - options.signal.onabort = () => ws.close(); + options.signal.onabort = () => ws.close(undefined, 'AbortSignal triggered'); } this.close = closeWithInfo; diff --git a/src/api/utils.ts b/src/api/utils.ts index 3fb538a9a4..74666588f3 100644 --- a/src/api/utils.ts +++ b/src/api/utils.ts @@ -1,4 +1,7 @@ import { SignalResponse } from '@livekit/protocol'; +import { Result, ResultAsync, errAsync } from 'neverthrow'; +import type { Mutex } from '@livekit/mutex'; +import { ConnectionError } from '../room/errors'; import { toHttpUrl, toWebsocketUrl } from '../room/utils'; export function createRtcUrl(url: string, searchParams: URLSearchParams) { @@ -49,3 +52,128 @@ export function getAbortReasonAsString( return 'toString' in reason ? reason.toString() : defaultMessage; } } + +export function withTimeout( + ra: ResultAsyncLike, + ms: number, +): ResultAsync> { + const timeout = ResultAsync.fromPromise( + new Promise((_, reject) => + setTimeout(() => { + reject(ConnectionError.timeout('Timeout')); + }, ms), + ), + (e) => e as ReturnType, + ); + + return raceResults([ra, timeout]); +} + +export function withAbort( + ra: ResultAsyncLike, + signal: AbortSignal | undefined, +): ResultAsync> { + if (signal?.aborted) { + return errAsync(ConnectionError.cancelled('AbortSignal invoked')); + } + + const abortResult = ResultAsync.fromPromise( + new Promise((_, reject) => { + const onAbortHandler = () => { + reject(ConnectionError.cancelled('AbortSignal invoked')); + }; + signal?.addEventListener('abort', onAbortHandler, { once: true }); + }), + (e) => e as ReturnType, + ); + + return raceResults([ra, abortResult]); +} + +export function withMutex( + fn: ResultAsyncLike, + mutex: Mutex, +): ResultAsync { + return ResultAsync.fromSafePromise(mutex.lock()).andThen((unlock) => withFinally(fn, unlock)); +} + +/** + * Executes a callback after a ResultAsync completes, regardless of success or failure. + * Similar to Promise.finally() but for ResultAsync. + * + * @param ra - The ResultAsync to execute + * @param onFinally - Callback to run after completion (receives no arguments) + * @returns A new ResultAsync with the same result, but runs onFinally first + * + * @example + * ```ts + * withFinally( + * someOperation(), + * () => cleanup() + * ) + * ``` + */ +export function withFinally( + ra: ResultAsyncLike, + onFinally: () => void | Promise, +): ResultAsync { + return ResultAsync.fromPromise( + (async () => { + try { + const result = await ra; + return result.match( + (value) => value, + (error) => { + throw error as Error; + }, + ); + } catch (error) { + throw error as Error; + } finally { + await onFinally(); + } + })(), + (e) => e as E, + ); +} + +/** + * Races multiple ResultAsync operations and returns whichever completes first. + * If all fail, returns the error from the first one to reject. + * API-compatible with Promise.race, supporting heterogeneous types. + * + * @param values - Array of ResultAsync operations to race (can have different types) + * @returns A new ResultAsync with the result of whichever completes first + * + * @example + * ```ts + * // Race a connection attempt against a timeout + * raceResults([ + * connectToServer(), // ResultAsync + * delay(5000).andThen(() => errAsync(new TimeoutError())) // ResultAsync + * ]) // ResultAsync + * ``` + */ +export function raceResults[]>( + values: T, +): ResultAsync< + T[number] extends ResultAsync ? V : never, + T[number] extends ResultAsync ? E : never +> { + type Value = T[number] extends ResultAsync ? V : never; + type Err = T[number] extends ResultAsync ? E : never; + + const settledPromises = values.map( + (ra): PromiseLike => + ra.then((res) => + res.match( + (v) => Promise.resolve(v), + (err) => Promise.reject(err), + ), + ), + ); + + return ResultAsync.fromPromise(Promise.race(settledPromises), (e) => e as Err); +} + +export type ResultAsyncLike = ResultAsync | Promise>; diff --git a/src/connectionHelper/checks/turn.ts b/src/connectionHelper/checks/turn.ts index 3aef7f05e1..4eabb6fa22 100644 --- a/src/connectionHelper/checks/turn.ts +++ b/src/connectionHelper/checks/turn.ts @@ -8,7 +8,7 @@ export class TURNCheck extends Checker { async perform(): Promise { const signalClient = new SignalClient(); - const joinRes = await signalClient.join(this.url, this.token, { + const joinResult = await signalClient.join(this.url, this.token, { autoSubscribe: true, maxRetries: 0, e2eeEnabled: false, @@ -16,6 +16,9 @@ export class TURNCheck extends Checker { singlePeerConnection: false, }); + // TODO fix unsafe usage + const joinRes = joinResult._unsafeUnwrap(); + let hasTLS = false; let hasTURN = false; let hasSTUN = false; diff --git a/src/connectionHelper/checks/websocket.ts b/src/connectionHelper/checks/websocket.ts index ab9afa34d3..40592e4617 100644 --- a/src/connectionHelper/checks/websocket.ts +++ b/src/connectionHelper/checks/websocket.ts @@ -13,13 +13,15 @@ export class WebSocketCheck extends Checker { } let signalClient = new SignalClient(); - const joinRes = await signalClient.join(this.url, this.token, { - autoSubscribe: true, - maxRetries: 0, - e2eeEnabled: false, - websocketTimeout: 15_000, - singlePeerConnection: false, - }); + const joinRes = ( + await signalClient.join(this.url, this.token, { + autoSubscribe: true, + maxRetries: 0, + e2eeEnabled: false, + websocketTimeout: 15_000, + singlePeerConnection: false, + }) + )._unsafeUnwrap(); this.appendMessage(`Connected to server, version ${joinRes.serverVersion}.`); if (joinRes.serverInfo?.edition === ServerInfo_Edition.Cloud && joinRes.serverInfo?.region) { this.appendMessage(`LiveKit Cloud: ${joinRes.serverInfo?.region}`); diff --git a/src/e2ee/E2eeManager.ts b/src/e2ee/E2eeManager.ts index b10676a891..8ef26dacfe 100644 --- a/src/e2ee/E2eeManager.ts +++ b/src/e2ee/E2eeManager.ts @@ -227,7 +227,7 @@ export class E2EEManager }; private onWorkerError = (ev: ErrorEvent) => { - log.error('e2ee worker encountered an error:', { error: ev.error }); + log.error('e2ee worker encountered an error:', { error: ev }); this.emit(EncryptionEvent.EncryptionError, ev.error, undefined); }; diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index c84ae8e965..14c0591d9f 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -67,7 +67,7 @@ export default class PCTransport extends EventEmitter { remoteNackMids: string[] = []; - onOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => void; + onOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => Promise; onIceCandidate?: (candidate: RTCIceCandidate) => void; @@ -352,7 +352,7 @@ export default class PCTransport extends EventEmitter { return; } await this.setMungedSDP(offer, write(sdpParsed)); - this.onOffer(offer, this.latestOfferId); + await this.onOffer(offer, this.latestOfferId); } finally { unlock(); } diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index e805de8ed2..376419f562 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,9 +1,10 @@ import { Mutex } from '@livekit/mutex'; import { SignalTarget } from '@livekit/protocol'; +import { ResultAsync, ok, okAsync } from 'neverthrow'; import log, { LoggerNames, getLogger } from '../logger'; import PCTransport, { PCEvents } from './PCTransport'; import { roomConnectOptionDefaults } from './defaults'; -import { ConnectionError, ConnectionErrorReason } from './errors'; +import { ConnectionError } from './errors'; import CriticalTimers from './timers'; import type { LoggerOptions } from './types'; import { sleep } from './utils'; @@ -49,7 +50,7 @@ export class PCTransportManager { public onTrack?: (ev: RTCTrackEvent) => void; - public onPublisherOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => void; + public onPublisherOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => Promise; private isPublisherConnectionRequired: boolean; @@ -99,8 +100,8 @@ export class PCTransportManager { this.onTrack?.(ev); }; - this.publisher.onOffer = (offer, offerId) => { - this.onPublisherOffer?.(offer, offerId); + this.publisher.onOffer = async (offer, offerId) => { + return this.onPublisherOffer?.(offer, offerId); }; this.state = PCTransportState.NEW; @@ -198,7 +199,10 @@ export class PCTransportManager { } } - async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { + async ensurePCTransportConnection( + abortController?: AbortController, + timeout?: number, + ): Promise> { const unlock = await this.connectionLock.lock(); try { if ( @@ -209,11 +213,11 @@ export class PCTransportManager { this.log.debug('negotiation required, start negotiating', this.logContext); this.publisher.negotiate(); } - await Promise.all( + return await ResultAsync.combine( this.requiredTransports?.map((transport) => this.ensureTransportConnected(transport, abortController, timeout), ), - ); + ).andThen(() => ok()); } finally { unlock(); } @@ -330,58 +334,46 @@ export class PCTransportManager { } }; - private async ensureTransportConnected( + private ensureTransportConnected( pcTransport: PCTransport, abortController?: AbortController, timeout: number = this.peerConnectionTimeout, - ) { + ): ResultAsync { const connectionState = pcTransport.getConnectionState(); if (connectionState === 'connected') { - return; + return okAsync(); } - return new Promise(async (resolve, reject) => { - const abortHandler = () => { - this.log.warn('abort transport connection', this.logContext); - CriticalTimers.clearTimeout(connectTimeout); + return ResultAsync.fromPromise( + new Promise(async (resolve, reject) => { + const abortHandler = () => { + this.log.warn('abort transport connection', this.logContext); + CriticalTimers.clearTimeout(connectTimeout); - reject( - new ConnectionError( - 'room connection has been cancelled', - ConnectionErrorReason.Cancelled, - ), - ); - }; - if (abortController?.signal.aborted) { - abortHandler(); - } - abortController?.signal.addEventListener('abort', abortHandler); - - const connectTimeout = CriticalTimers.setTimeout(() => { - abortController?.signal.removeEventListener('abort', abortHandler); - reject( - new ConnectionError( - 'could not establish pc connection', - ConnectionErrorReason.InternalError, - ), - ); - }, timeout); - - while (this.state !== PCTransportState.CONNECTED) { - await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations + reject(ConnectionError.cancelled('room connection has been cancelled')); + }; if (abortController?.signal.aborted) { - reject( - new ConnectionError( - 'room connection has been cancelled', - ConnectionErrorReason.Cancelled, - ), - ); - return; + abortHandler(); } - } - CriticalTimers.clearTimeout(connectTimeout); - abortController?.signal.removeEventListener('abort', abortHandler); - resolve(); - }); + abortController?.signal.addEventListener('abort', abortHandler); + + const connectTimeout = CriticalTimers.setTimeout(() => { + abortController?.signal.removeEventListener('abort', abortHandler); + reject(ConnectionError.internal('could not establish pc connection')); + }, timeout); + + while (this.state !== PCTransportState.CONNECTED) { + await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations + if (abortController?.signal.aborted) { + reject(ConnectionError.cancelled('room connection has been cancelled')); + return; + } + } + CriticalTimers.clearTimeout(connectTimeout); + abortController?.signal.removeEventListener('abort', abortHandler); + resolve(); + }), + (e: unknown) => e as ConnectionError, + ); } } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index c440928a10..9de406370e 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -39,6 +39,7 @@ import { type UserPacket, } from '@livekit/protocol'; import { EventEmitter } from 'events'; +import { type Result, err, errAsync, ok, okAsync, safeTry } from 'neverthrow'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; @@ -62,6 +63,8 @@ import { ConnectionError, ConnectionErrorReason, NegotiationError, + SignalReconnectError, + SimulatedError, TrackInvalidError, UnexpectedConnectionState, } from './errors'; @@ -264,38 +267,20 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit token: string, opts: SignalOptions, abortSignal?: AbortSignal, - ): Promise { + ): Promise> { this.url = url; this.token = token; this.signalOpts = opts; this.maxJoinAttempts = opts.maxRetries; - try { - this.joinAttempts += 1; - - this.setupSignalClientCallbacks(); - const joinResponse = await this.client.join(url, token, opts, abortSignal); - this._isClosed = false; - this.latestJoinResponse = joinResponse; + this.joinAttempts += 1; - this.subscriberPrimary = joinResponse.subscriberPrimary; - if (!this.pcManager) { - await this.configure(joinResponse); - } - - // create offer - if (!this.subscriberPrimary || joinResponse.fastPublish) { - this.negotiate().catch((err) => { - log.error(err, this.logContext); - }); - } + this.setupSignalClientCallbacks(); + const joinResult = await this.client.join(url, token, opts, abortSignal); - this.registerOnLineListener(); - this.clientConfiguration = joinResponse.clientConfiguration; - this.emit(EngineEvent.SignalConnected, joinResponse); - return joinResponse; - } catch (e) { - if (e instanceof ConnectionError) { - if (e.reason === ConnectionErrorReason.ServerUnreachable) { + if (joinResult.isErr()) { + const error = joinResult.error; + if (error instanceof ConnectionError) { + if (error.reason === ConnectionErrorReason.ServerUnreachable) { this.log.warn( `Couldn't connect to server, attempt ${this.joinAttempts} of ${this.maxJoinAttempts}`, this.logContext, @@ -305,8 +290,30 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } } - throw e; + return err(error); } + + const joinResponse = joinResult.value; + + this._isClosed = false; + this.latestJoinResponse = joinResponse; + + this.subscriberPrimary = joinResponse.subscriberPrimary; + if (!this.pcManager) { + await this.configure(joinResponse); + } + + // create offer + if (!this.subscriberPrimary || joinResponse.fastPublish) { + this.negotiate().catch((error) => { + log.error(error, this.logContext); + }); + } + + this.registerOnLineListener(); + this.clientConfiguration = joinResponse.clientConfiguration; + this.emit(EngineEvent.SignalConnected, joinResponse); + return ok(joinResponse); } async close() { @@ -381,10 +388,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const publicationTimeout = setTimeout(() => { delete this.pendingTrackResolvers[req.cid]; reject( - new ConnectionError( - 'publication of local track timed out, no response from server', - ConnectionErrorReason.Timeout, - ), + ConnectionError.timeout('publication of local track timed out, no response from server'), ); }, 10_000); this.pendingTrackResolvers[req.cid] = { @@ -468,7 +472,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; this.pcManager.onPublisherOffer = (offer, offerId) => { - this.client.sendOffer(offer, offerId); + return this.client.sendOffer(offer, offerId); }; this.pcManager.onDataChannel = this.handleDataChannel; @@ -1021,23 +1025,26 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.fullReconnectOnNext = true; } - try { - this.attemptingReconnect = true; - if (this.fullReconnectOnNext) { - await this.restartConnection(); - } else { - await this.resumeConnection(reason); - } - this.clearPendingReconnect(); - this.fullReconnectOnNext = false; - } catch (e) { + let result: Result; + this.attemptingReconnect = true; + if (this.fullReconnectOnNext) { + result = await this.restartConnection(); + } else { + result = await this.resumeConnection(reason); + } + this.clearPendingReconnect(); + this.fullReconnectOnNext = false; + if (result.isErr()) { + const error = result.error; this.reconnectAttempts += 1; let recoverable = true; - if (e instanceof UnexpectedConnectionState) { - this.log.debug('received unrecoverable error', { ...this.logContext, error: e }); + // TODO this needs proper handling to define which errors are actually unexpected and non recoverable + // Currently all connection related errors are ConnectionErrors + if (error instanceof UnexpectedConnectionState) { + this.log.debug('received unrecoverable error', { ...this.logContext, error }); // unrecoverable recoverable = false; - } else if (!(e instanceof SignalReconnectError)) { + } else if (!(error instanceof SignalReconnectError)) { // cannot resume this.fullReconnectOnNext = true; } @@ -1054,9 +1061,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.emit(EngineEvent.Disconnected); await this.close(); } - } finally { - this.attemptingReconnect = false; } + this.attemptingReconnect = false; } private getNextRetryDelay(context: ReconnectContext) { @@ -1070,108 +1076,123 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return null; } - private async restartConnection(regionUrl?: string) { - try { - if (!this.url || !this.token) { + private async restartConnection( + regionUrl?: string, + ): Promise< + Result< + void, + UnexpectedConnectionState | SignalReconnectError | ConnectionError | SimulatedError + > + > { + const self = this; + const restartResultAsync = safeTry(async function* () { + if (!self.url || !self.token) { // permanent failure, don't attempt reconnection - throw new UnexpectedConnectionState('could not reconnect, url or token not saved'); + return err(new UnexpectedConnectionState('could not reconnect, url or token not saved')); } - this.log.info(`reconnecting, attempt: ${this.reconnectAttempts}`, this.logContext); - this.emit(EngineEvent.Restarting); + self.log.info(`reconnecting, attempt: ${self.reconnectAttempts}`, self.logContext); + self.emit(EngineEvent.Restarting); - if (!this.client.isDisconnected) { - await this.client.sendLeave(); + if (!self.client.isDisconnected) { + await self.client.sendLeave(); } - await this.cleanupPeerConnections(); - await this.cleanupClient(); + await self.cleanupPeerConnections(); + await self.cleanupClient(); - let joinResponse: JoinResponse; - try { - if (!this.signalOpts) { - this.log.warn( - 'attempted connection restart, without signal options present', - this.logContext, - ); - throw new SignalReconnectError(); - } - // in case a regionUrl is passed, the region URL takes precedence - joinResponse = await this.join(regionUrl ?? this.url, this.token, this.signalOpts); - } catch (e) { - if (e instanceof ConnectionError && e.reason === ConnectionErrorReason.NotAllowed) { - throw new UnexpectedConnectionState('could not reconnect, token might be expired'); + if (!self.signalOpts) { + self.log.warn( + 'attempted connection restart, without signal options present', + self.logContext, + ); + return err(new SignalReconnectError()); + } + // in case a regionUrl is passed, the region URL takes precedence + const joinResult = await self.join(regionUrl ?? self.url, self.token, self.signalOpts); + if (joinResult.isErr()) { + const error = joinResult.error; + if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.NotAllowed) { + return err(new UnexpectedConnectionState('could not reconnect, token might be expired')); } - throw new SignalReconnectError(); + return err(new SignalReconnectError()); } - if (this.shouldFailNext) { - this.shouldFailNext = false; - throw new Error('simulated failure'); + if (self.shouldFailNext) { + self.shouldFailNext = false; + return err(new SimulatedError()); } - this.client.setReconnected(); - this.emit(EngineEvent.SignalRestarted, joinResponse); + self.client.setReconnected(); + self.emit(EngineEvent.SignalRestarted, joinResult.value); - await this.waitForPCReconnected(); + yield* await self.waitForPCReconnected(); // re-check signal connection state before setting engine as resumed - if (this.client.currentState !== SignalConnectionState.CONNECTED) { - throw new SignalReconnectError('Signal connection got severed during reconnect'); + if (self.client.currentState !== SignalConnectionState.CONNECTED) { + return err(new SignalReconnectError('Signal connection got severed during reconnect')); } - this.regionUrlProvider?.resetAttempts(); + self.regionUrlProvider?.resetAttempts(); // reconnect success - this.emit(EngineEvent.Restarted); - } catch (error) { + self.emit(EngineEvent.Restarted); + return ok(); + }); + + const restartResult = await restartResultAsync; + + if (restartResult.isErr()) { const nextRegionUrl = await this.regionUrlProvider?.getNextBestRegionUrl(); if (nextRegionUrl) { - await this.restartConnection(nextRegionUrl); - return; + this.log.info('retrying signal connection'); + return this.restartConnection(nextRegionUrl); } else { // no more regions to try (or we're not on cloud) this.regionUrlProvider?.resetAttempts(); - throw error; + return err(restartResult.error); } } + return ok(restartResult.value); } - private async resumeConnection(reason?: ReconnectReason): Promise { + async resumeConnection( + reason?: ReconnectReason, + ): Promise> { if (!this.url || !this.token) { // permanent failure, don't attempt reconnection - throw new UnexpectedConnectionState('could not reconnect, url or token not saved'); + return errAsync(new UnexpectedConnectionState('could not reconnect, url or token not saved')); } // trigger publisher reconnect if (!this.pcManager) { - throw new UnexpectedConnectionState('publisher and subscriber connections unset'); + return errAsync(new UnexpectedConnectionState('publisher and subscriber connections unset')); } this.log.info(`resuming signal connection, attempt ${this.reconnectAttempts}`, this.logContext); this.emit(EngineEvent.Resuming); - let res: ReconnectResponse | undefined; - try { - this.setupSignalClientCallbacks(); - res = await this.client.reconnect(this.url, this.token, this.participantSid, reason); - } catch (error) { - let message = ''; - if (error instanceof Error) { - message = error.message; - this.log.error(error.message, { ...this.logContext, error }); - } - if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.NotAllowed) { - throw new UnexpectedConnectionState('could not reconnect, token might be expired'); - } - if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.LeaveRequest) { - throw error; - } - throw new SignalReconnectError(message); + this.setupSignalClientCallbacks(); + + const reconnectResult = await this.client.reconnect( + this.url, + this.token, + this.participantSid, + reason, + ); + + if (reconnectResult.isErr()) { + return errAsync( + new SignalReconnectError( + `${reconnectResult.error.reasonName}: ${reconnectResult.error.message}`, + ), + ); } + this.emit(EngineEvent.SignalResumed); - if (res) { - const rtcConfig = this.makeRTCConfiguration(res); + const reconnectResponse = reconnectResult.value; + if (reconnectResponse) { + const rtcConfig = this.makeRTCConfiguration(reconnectResponse); this.pcManager.updateConfiguration(rtcConfig); if (this.latestJoinResponse) { - this.latestJoinResponse.serverInfo = res.serverInfo; + this.latestJoinResponse.serverInfo = reconnectResponse.serverInfo; } } else { this.log.warn('Did not receive reconnect response', this.logContext); @@ -1179,7 +1200,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (this.shouldFailNext) { this.shouldFailNext = false; - throw new Error('simulated failure'); + return err(new SimulatedError()); } await this.pcManager.triggerIceRestart(); @@ -1188,7 +1209,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // re-check signal connection state before setting engine as resumed if (this.client.currentState !== SignalConnectionState.CONNECTED) { - throw new SignalReconnectError('Signal connection got severed during reconnect'); + return err(new SignalReconnectError('Signal connection got severed during reconnect')); } this.client.setReconnected(); @@ -1199,39 +1220,44 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.createDataChannels(); } - if (res?.lastMessageSeq) { - this.resendReliableMessagesForResume(res.lastMessageSeq); + if (reconnectResponse?.lastMessageSeq) { + this.resendReliableMessagesForResume(reconnectResponse.lastMessageSeq); } // resume success this.emit(EngineEvent.Resumed); + + return okAsync(); } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { if (!this.pcManager) { throw new UnexpectedConnectionState('PC manager is closed'); } - await this.pcManager.ensurePCTransportConnection(abortController, timeout); + return this.pcManager.ensurePCTransportConnection(abortController, timeout); } - private async waitForPCReconnected() { + private async waitForPCReconnected(): Promise< + Result + > { this.pcState = PCState.Reconnecting; this.log.debug('waiting for peer connection to reconnect', this.logContext); try { await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path if (!this.pcManager) { - throw new UnexpectedConnectionState('PC manager is closed'); + return err(new UnexpectedConnectionState('PC manager is closed')); } - await this.pcManager.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + const res = await this.pcManager.ensurePCTransportConnection( + undefined, + this.peerConnectionTimeout, + ); this.pcState = PCState.Connected; + return res; } catch (e: any) { // TODO do we need a `failed` state here for the PC? this.pcState = PCState.Disconnected; - throw new ConnectionError( - `could not establish PC connection, ${e.message}`, - ConnectionErrorReason.InternalError, - ); + return err(ConnectionError.internal(`could not establish PC connection: ${e.message}`)); } } @@ -1412,10 +1438,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { - throw new ConnectionError( - `${transportName} connection not set`, - ConnectionErrorReason.InternalError, - ); + throw ConnectionError.internal(`${transportName} connection not set`); } let needNegotiation = false; @@ -1434,8 +1457,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } if (needNegotiation) { // start negotiation - this.negotiate().catch((err) => { - log.error(err, this.logContext); + this.negotiate().catch((error) => { + log.error(error, this.logContext); }); } @@ -1456,9 +1479,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit await sleep(50); } - throw new ConnectionError( + throw ConnectionError.internal( `could not establish ${transportName} connection, state: ${transport.getICEConnectionState()}`, - ConnectionErrorReason.InternalError, ); } @@ -1748,8 +1770,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } -class SignalReconnectError extends Error {} - export type EngineEventCallbacks = { connected: (joinResp: JoinResponse) => void; disconnected: (reason?: DisconnectReason) => void; diff --git a/src/room/RegionUrlProvider.test.ts b/src/room/RegionUrlProvider.test.ts index 0d027c476b..bb27a72d62 100644 --- a/src/room/RegionUrlProvider.test.ts +++ b/src/room/RegionUrlProvider.test.ts @@ -180,7 +180,9 @@ describe('RegionUrlProvider', () => { const provider = new RegionUrlProvider('wss://test.livekit.cloud', 'token'); fetchMock.mockResolvedValue(createMockResponse(401)); - await expect(provider.fetchRegionSettings()).rejects.toThrow(ConnectionError); + await expect(provider.fetchRegionSettings()).rejects.toThrow( + ConnectionError.notAllowed('Could not fetch region settings: Unauthorized', 401), + ); await expect(provider.fetchRegionSettings()).rejects.toMatchObject({ reason: ConnectionErrorReason.NotAllowed, status: 401, @@ -191,10 +193,14 @@ describe('RegionUrlProvider', () => { const provider = new RegionUrlProvider('wss://test.livekit.cloud', 'token'); fetchMock.mockResolvedValue(createMockResponse(500)); - await expect(provider.fetchRegionSettings()).rejects.toThrow(ConnectionError); + await expect(provider.fetchRegionSettings()).rejects.toThrow( + ConnectionError.internal('Could not fetch region settings: Internal Server Error', { + status: 500, + }), + ); await expect(provider.fetchRegionSettings()).rejects.toMatchObject({ reason: ConnectionErrorReason.InternalError, - status: 500, + context: { status: 500 }, }); }); diff --git a/src/room/RegionUrlProvider.ts b/src/room/RegionUrlProvider.ts index dbd578f928..a3d4f6fe2f 100644 --- a/src/room/RegionUrlProvider.ts +++ b/src/room/RegionUrlProvider.ts @@ -45,25 +45,26 @@ export class RegionUrlProvider { const regionSettings = (await regionSettingsResponse.json()) as RegionSettings; return { regionSettings, updatedAtInMs: Date.now(), maxAgeInMs }; } else { - throw new ConnectionError( - `Could not fetch region settings: ${regionSettingsResponse.statusText}`, - regionSettingsResponse.status === 401 - ? ConnectionErrorReason.NotAllowed - : ConnectionErrorReason.InternalError, - regionSettingsResponse.status, - ); + throw regionSettingsResponse.status === 401 + ? ConnectionError.notAllowed( + `Could not fetch region settings: ${regionSettingsResponse.statusText}`, + regionSettingsResponse.status, + ) + : ConnectionError.internal( + `Could not fetch region settings: ${regionSettingsResponse.statusText}`, + { status: regionSettingsResponse.status }, + ); } } catch (e: unknown) { if (e instanceof ConnectionError) { // rethrow connection errors throw e; } else if (signal?.aborted) { - throw new ConnectionError(`Region fetching was aborted`, ConnectionErrorReason.Cancelled); + throw ConnectionError.cancelled(`Region fetching was aborted`); } else { // wrap other errors as connection errors (e.g. timeouts) - throw new ConnectionError( + throw ConnectionError.serverUnreachable( `Could not fetch region settings, ${e instanceof Error ? `${e.name}: ${e.message}` : e}`, - ConnectionErrorReason.ServerUnreachable, 500, // using 500 as a catch-all manually set error code here ); } diff --git a/src/room/Room.ts b/src/room/Room.ts index 9257a1c542..1280d9b813 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -686,7 +686,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) try { await BackOffStrategy.getInstance().getBackOffPromise(url); if (abortController.signal.aborted) { - throw new ConnectionError('Connection attempt aborted', ConnectionErrorReason.Cancelled); + ConnectionError.cancelled('Connection attempt aborted'); } await this.attemptConnection(regionUrl ?? url, token, opts, abortController); this.abortController = undefined; @@ -773,7 +773,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) roomOptions: InternalRoomOptions, abortController: AbortController, ): Promise => { - const joinResponse = await engine.join( + const joinResult = await engine.join( url, token, { @@ -788,6 +788,13 @@ class Room extends (EventEmitter as new () => TypedEmitter) abortController.signal, ); + // TODO continue propagating Result, we don't need to throw here + if (joinResult.isErr()) { + throw joinResult.error; + } + + const joinResponse = joinResult.value; + let serverInfo: Partial | undefined = joinResponse.serverInfo; if (!serverInfo) { serverInfo = { version: joinResponse.serverVersion, region: joinResponse.serverRegion }; @@ -894,12 +901,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) } catch (err) { await this.engine.close(); this.recreateEngine(); - const resultingError = new ConnectionError( - `could not establish signal connection`, - abortController.signal.aborted - ? ConnectionErrorReason.Cancelled - : ConnectionErrorReason.ServerUnreachable, - ); + const resultingError = abortController.signal.aborted + ? ConnectionError.cancelled(`could not establish signal connection`) + : ConnectionError.serverUnreachable(`could not establish signal connection`); if (err instanceof Error) { resultingError.message = `${resultingError.message}: ${err.message}`; } @@ -917,14 +921,19 @@ class Room extends (EventEmitter as new () => TypedEmitter) if (abortController.signal.aborted) { await this.engine.close(); this.recreateEngine(); - throw new ConnectionError(`Connection attempt aborted`, ConnectionErrorReason.Cancelled); + throw ConnectionError.cancelled(`Connection attempt aborted`); } try { - await this.engine.waitForPCInitialConnection( + const result = await this.engine.waitForPCInitialConnection( this.connOptions.peerConnectionTimeout, abortController, ); + if (result.isErr()) { + await this.engine.close(); + this.recreateEngine(); + throw result.error; + } } catch (e) { await this.engine.close(); this.recreateEngine(); @@ -974,9 +983,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.log.warn(msg, this.logContext); this.abortController?.abort(msg); // in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly - this.connectFuture?.reject?.( - new ConnectionError('Client initiated disconnect', ConnectionErrorReason.Cancelled), - ); + this.connectFuture?.reject?.(ConnectionError.cancelled('Client initiated disconnect')); this.connectFuture = undefined; } @@ -1925,7 +1932,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); if (byteLength(response) > MAX_PAYLOAD_BYTES) { responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'); - console.warn(`RPC Response payload too large for ${method}`); + this.log.warn(`RPC Response payload too large for ${method}`, this.logContext); } else { responsePayload = response; } @@ -1933,9 +1940,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) if (error instanceof RpcError) { responseError = error; } else { - console.warn( + this.log.warn( `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, - error, + { ...this.logContext, error }, ); responseError = RpcError.builtIn('APPLICATION_ERROR'); } diff --git a/src/room/errors.ts b/src/room/errors.ts index 5c4c842aab..efd52edf78 100644 --- a/src/room/errors.ts +++ b/src/room/errors.ts @@ -10,6 +10,14 @@ export class LivekitError extends Error { } } +export class SimulatedError extends LivekitError { + readonly name = 'simulated'; + + constructor(message = 'Simulated failure') { + super(-1, message); + } +} + export enum ConnectionErrorReason { NotAllowed, ServerUnreachable, @@ -17,80 +25,197 @@ export enum ConnectionErrorReason { Cancelled, LeaveRequest, Timeout, + WebSocket, } -export class ConnectionError extends LivekitError { +type NotAllowed = { + reason: ConnectionErrorReason.NotAllowed; + status: number; + context?: unknown; +}; + +type InternalError = { + reason: ConnectionErrorReason.InternalError; + status: never; + context?: { status?: number; statusText?: string }; +}; + +type ConnectionTimeout = { + reason: ConnectionErrorReason.Timeout; + status: never; + context: never; +}; + +type LeaveRequest = { + reason: ConnectionErrorReason.LeaveRequest; + status: never; + context: DisconnectReason; +}; + +type Cancelled = { + reason: ConnectionErrorReason.Cancelled; + status: never; + context: never; +}; + +type ServerUnreachable = { + reason: ConnectionErrorReason.ServerUnreachable; + status?: number; + context?: never; +}; + +type WebSocket = { + reason: ConnectionErrorReason.WebSocket; status?: number; + context?: string; +}; + +type ConnectionErrorVariants = + | NotAllowed + | ConnectionTimeout + | LeaveRequest + | InternalError + | Cancelled + | ServerUnreachable + | WebSocket; - context?: unknown | DisconnectReason; +export class ConnectionError< + Variant extends ConnectionErrorVariants = ConnectionErrorVariants, +> extends LivekitError { + status?: Variant['status']; - reason: ConnectionErrorReason; + context: Variant['context']; + + reason: Variant['reason']; reasonName: string; - constructor( + readonly name = 'ConnectionError'; + + protected constructor( message: string, - reason: ConnectionErrorReason, - status?: number, - context?: unknown | DisconnectReason, + reason: Variant['reason'], + status?: Variant['status'], + context?: Variant['context'], ) { super(1, message); - this.name = 'ConnectionError'; this.status = status; this.reason = reason; this.context = context; this.reasonName = ConnectionErrorReason[reason]; } + + static notAllowed(message: string, status: number, context?: unknown) { + return new ConnectionError( + message, + ConnectionErrorReason.NotAllowed, + status, + context, + ); + } + + static timeout(message: string) { + return new ConnectionError(message, ConnectionErrorReason.Timeout); + } + + static leaveRequest(message: string, context: DisconnectReason) { + return new ConnectionError( + message, + ConnectionErrorReason.LeaveRequest, + undefined, + context, + ); + } + + static internal(message: string, context?: { status?: number; statusText?: string }) { + return new ConnectionError( + message, + ConnectionErrorReason.InternalError, + undefined, + context, + ); + } + + static cancelled(message: string) { + return new ConnectionError(message, ConnectionErrorReason.Cancelled); + } + + static serverUnreachable(message: string, status?: number) { + return new ConnectionError( + message, + ConnectionErrorReason.ServerUnreachable, + status, + ); + } + + static websocket(message: string, status?: number, reason?: string) { + return new ConnectionError(message, ConnectionErrorReason.WebSocket, status, reason); + } +} + +export class SignalReconnectError extends LivekitError { + readonly name = 'SignalReconnectError'; + + constructor(message?: string) { + super(12, message); + } } export class DeviceUnsupportedError extends LivekitError { + readonly name = 'DeviceUnsupportedError'; + constructor(message?: string) { super(21, message ?? 'device is unsupported'); - this.name = 'DeviceUnsupportedError'; } } export class TrackInvalidError extends LivekitError { + readonly name = 'TrackInvalidError'; + constructor(message?: string) { super(20, message ?? 'track is invalid'); - this.name = 'TrackInvalidError'; } } export class UnsupportedServer extends LivekitError { + readonly name = 'UnsupportedServer'; + constructor(message?: string) { super(10, message ?? 'unsupported server'); - this.name = 'UnsupportedServer'; } } export class UnexpectedConnectionState extends LivekitError { + readonly name = 'UnexpectedConnectionState'; + constructor(message?: string) { super(12, message ?? 'unexpected connection state'); - this.name = 'UnexpectedConnectionState'; } } export class NegotiationError extends LivekitError { + readonly name = 'NegotiationError'; + constructor(message?: string) { super(13, message ?? 'unable to negotiate'); - this.name = 'NegotiationError'; } } export class PublishDataError extends LivekitError { + readonly name = 'PublishDataError'; + constructor(message?: string) { super(14, message ?? 'unable to publish data'); - this.name = 'PublishDataError'; } } export class PublishTrackError extends LivekitError { + readonly name = 'PublishTrackError'; + status: number; constructor(message: string, status: number) { super(15, message); - this.name = 'PublishTrackError'; this.status = status; } } @@ -100,6 +225,8 @@ export type RequestErrorReason = | 'TimeoutError'; export class SignalRequestError extends LivekitError { + readonly name = 'SignalRequestError'; + reason: RequestErrorReason; reasonName: string; @@ -136,13 +263,14 @@ export enum DataStreamErrorReason { } export class DataStreamError extends LivekitError { + readonly name = 'DataStreamError'; + reason: DataStreamErrorReason; reasonName: string; constructor(message: string, reason: DataStreamErrorReason) { super(16, message); - this.name = 'DataStreamError'; this.reason = reason; this.reasonName = DataStreamErrorReason[reason]; }