diff --git a/__tests__/WebSocketChannel.test.ts b/__tests__/WebSocketChannel.test.ts index 1ed1e45b9..10eaaf30b 100644 --- a/__tests__/WebSocketChannel.test.ts +++ b/__tests__/WebSocketChannel.test.ts @@ -1,193 +1,433 @@ -import { Provider, WSSubscriptions, WebSocketChannel } from '../src'; +/* eslint-disable no-underscore-dangle */ +import { Provider, Subscription, WebSocketChannel } from '../src'; +import { logger } from '../src/global/logger'; import { StarknetChainId } from '../src/global/constants'; import { getTestAccount, getTestProvider, STRKtokenAddress, TEST_WS_URL } from './config/fixtures'; -const nodeUrl = 'wss://sepolia-pathfinder-rpc.spaceshard.io/rpc/v0_8'; +const describeIfWs = TEST_WS_URL ? describe : describe.skip; +const NODE_URL = TEST_WS_URL!; -describe('websocket specific endpoints - pathfinder test', () => { - // account provider - const provider = new Provider(getTestProvider()); - const account = getTestAccount(provider); +describeIfWs('E2E WebSocket Tests', () => { + describe('websocket specific endpoints', () => { + // account provider + const provider = new Provider(getTestProvider()); + const account = getTestAccount(provider); - // websocket - let webSocketChannel: WebSocketChannel; + // websocket + let webSocketChannel: WebSocketChannel; - beforeAll(async () => { - webSocketChannel = new WebSocketChannel({ nodeUrl: TEST_WS_URL }); - expect(webSocketChannel.isConnected()).toBe(false); - try { + beforeEach(async () => { + webSocketChannel = new WebSocketChannel({ nodeUrl: NODE_URL }); await webSocketChannel.waitForConnection(); - } catch (error: any) { - console.log(error.message); - } - expect(webSocketChannel.isConnected()).toBe(true); - }); + }); - afterAll(async () => { - expect(webSocketChannel.isConnected()).toBe(true); - webSocketChannel.disconnect(); - await expect(webSocketChannel.waitForDisconnection()).resolves.toBe(WebSocket.CLOSED); - }); + afterEach(async () => { + if (webSocketChannel.isConnected()) { + webSocketChannel.disconnect(); + await webSocketChannel.waitForDisconnection(); + } + }); - test('Test WS Error and edge cases', async () => { - webSocketChannel.disconnect(); + test('should throw an error when sending on a disconnected socket', async () => { + // This test uses its own channel to disable auto-reconnect and isolate the error behavior + const testChannel = new WebSocketChannel({ nodeUrl: NODE_URL, autoReconnect: false }); + await testChannel.waitForConnection(); - // should fail as disconnected - await expect(webSocketChannel.subscribeNewHeads()).rejects.toThrow(); + testChannel.disconnect(); + await testChannel.waitForDisconnection(); - // should reconnect - webSocketChannel.reconnect(); - await webSocketChannel.waitForConnection(); + // With autoReconnect: false, this should immediately throw, not queue. + await expect(testChannel.subscribeNewHeads()).rejects.toThrow( + 'WebSocketChannel.send() failed due to socket being disconnected' + ); + }); - // should succeed after reconnection - await expect(webSocketChannel.subscribeNewHeads()).resolves.toEqual(expect.any(Number)); + test('should allow manual reconnection after a user-initiated disconnect', async () => { + // This test uses the default channel from `beforeEach` which has autoReconnect: true + webSocketChannel.disconnect(); + await webSocketChannel.waitForDisconnection(); - // should fail because already subscribed - await expect(webSocketChannel.subscribeNewHeads()).resolves.toBe(false); - }); + // It should not have auto-reconnected because the disconnect was user-initiated + expect(webSocketChannel.isConnected()).toBe(false); - test('onUnsubscribe with unsubscribeNewHeads', async () => { - const mockOnUnsubscribe = jest.fn().mockImplementation((subId: number) => { - expect(subId).toEqual(expect.any(Number)); + // Now, manually reconnect + webSocketChannel.reconnect(); + await webSocketChannel.waitForConnection(); + expect(webSocketChannel.isConnected()).toBe(true); + + // To prove the connection is working, make a simple RPC call. + // This avoids the flakiness of creating and tearing down a real subscription. + const chainId = await webSocketChannel.sendReceive('starknet_chainId'); + expect(chainId).toBe(StarknetChainId.SN_SEPOLIA); }); - webSocketChannel.onUnsubscribe = mockOnUnsubscribe; - await webSocketChannel.subscribeNewHeads(); - await expect(webSocketChannel.unsubscribeNewHeads()).resolves.toBe(true); - await expect(webSocketChannel.unsubscribeNewHeads()).rejects.toThrow(); + test('Test subscribeNewHeads', async () => { + const sub = await webSocketChannel.subscribeNewHeads(); + expect(sub).toBeInstanceOf(Subscription); + + let i = 0; + sub.on(async (result) => { + i += 1; + expect(result).toBeDefined(); + if (i === 2) { + const status = await sub.unsubscribe(); + expect(status).toBe(true); + } + }); + + await webSocketChannel.waitForUnsubscription(sub.id); + }); + + test('Test subscribeEvents', async () => { + const sub = await webSocketChannel.subscribeEvents(); + expect(sub).toBeInstanceOf(Subscription); + + let i = 0; + sub.on(async (result) => { + i += 1; + expect(result).toBeDefined(); + if (i === 5) { + const status = await sub.unsubscribe(); + expect(status).toBe(true); + } + }); + + await webSocketChannel.waitForUnsubscription(sub.id); + }); + + test('Test subscribePendingTransaction', async () => { + const sub = await webSocketChannel.subscribePendingTransaction(true); + expect(sub).toBeInstanceOf(Subscription); + + let i = 0; + sub.on(async (result) => { + i += 1; + expect(result).toBeDefined(); + if (i === 5) { + const status = await sub.unsubscribe(); + expect(status).toBe(true); + } + }); + await webSocketChannel.waitForUnsubscription(sub.id); + }); + + test('Test subscribeTransactionStatus', async () => { + const { transaction_hash } = await account.execute({ + contractAddress: STRKtokenAddress, + entrypoint: 'transfer', + calldata: [account.address, '10', '0'], + }); + + const sub = await webSocketChannel.subscribeTransactionStatus(transaction_hash); + expect(sub).toBeInstanceOf(Subscription); - expect(mockOnUnsubscribe).toHaveBeenCalled(); - expect(webSocketChannel.subscriptions.has(WSSubscriptions.NEW_HEADS)).toBeFalsy(); + let i = 0; + sub.on(async (result) => { + i += 1; + expect(result).toBeDefined(); + if (i >= 2) { + const status = await sub.unsubscribe(); + expect(status).toBe(true); + } + }); + await webSocketChannel.waitForUnsubscription(sub.id); + }); }); - test('Test subscribeNewHeads', async () => { - await webSocketChannel.subscribeNewHeads(); - - let i = 0; - webSocketChannel.onNewHeads = async function (data) { - expect(this).toBeInstanceOf(WebSocketChannel); - i += 1; - // TODO : Add data format validation - expect(data.result).toBeDefined(); - if (i === 2) { - const status = await webSocketChannel.unsubscribeNewHeads(); - expect(status).toBe(true); - } - }; - const expectedId = webSocketChannel.subscriptions.get(WSSubscriptions.NEW_HEADS); - const subscriptionId = await webSocketChannel.waitForUnsubscription(expectedId); - expect(subscriptionId).toBe(expectedId); - expect(webSocketChannel.subscriptions.get(WSSubscriptions.NEW_HEADS)).toBe(undefined); + describe('websocket regular endpoints', () => { + let webSocketChannel: WebSocketChannel; + + beforeAll(async () => { + webSocketChannel = new WebSocketChannel({ nodeUrl: NODE_URL }); + expect(webSocketChannel.isConnected()).toBe(false); + const status = await webSocketChannel.waitForConnection(); + expect(status).toBe(WebSocket.OPEN); + }); + + afterAll(async () => { + expect(webSocketChannel.isConnected()).toBe(true); + webSocketChannel.disconnect(); + await webSocketChannel.waitForDisconnection(); + }); + + test('regular rpc endpoint', async () => { + const response = await webSocketChannel.sendReceive('starknet_chainId'); + expect(response).toBe(StarknetChainId.SN_SEPOLIA); + }); }); - test('Test subscribeEvents', async () => { - await webSocketChannel.subscribeEvents(); - - let i = 0; - webSocketChannel.onEvents = async (data) => { - i += 1; - // TODO : Add data format validation - expect(data.result).toBeDefined(); - if (i === 5) { - const status = await webSocketChannel.unsubscribeEvents(); - expect(status).toBe(true); + describe('WebSocketChannel Auto-Reconnection', () => { + let webSocketChannel: WebSocketChannel; + + afterEach(async () => { + // Ensure the channel is always disconnected after each test to prevent open handles. + if (webSocketChannel) { + webSocketChannel.disconnect(); + await webSocketChannel.waitForDisconnection(); } - }; - const expectedId = webSocketChannel.subscriptions.get(WSSubscriptions.EVENTS); - const subscriptionId = await webSocketChannel.waitForUnsubscription(expectedId); - expect(subscriptionId).toBe(expectedId); - expect(webSocketChannel.subscriptions.get(WSSubscriptions.EVENTS)).toBe(undefined); + }); + + test('should automatically reconnect on connection drop', (done) => { + // Set a very short reconnection delay for faster tests + webSocketChannel = new WebSocketChannel({ + nodeUrl: NODE_URL, + reconnectOptions: { retries: 3, delay: 100 }, + }); + + let hasReconnected = false; + webSocketChannel.on('open', () => { + // This will be called once on initial connection, and a second time on reconnection. + if (hasReconnected) { + done(); // Test is successful if we get here + } else { + // This is the first connection, now we simulate the drop + hasReconnected = true; + webSocketChannel.websocket.close(); + } + }); + }); + + test('sendReceive should time out if no response is received', async () => { + webSocketChannel = new WebSocketChannel({ + nodeUrl: NODE_URL, + requestTimeout: 100, // Set a short timeout for testing + }); + await webSocketChannel.waitForConnection(); + + // Spy on the 'send' method and prevent it from sending anything. + // This guarantees that we will never get a response and the timeout will be triggered. + const sendSpy = jest.spyOn(webSocketChannel.websocket, 'send').mockImplementation(() => {}); + + // We expect this promise to reject with a timeout error. + await expect( + webSocketChannel.sendReceive('some_method_that_will_never_get_a_response') + ).rejects.toThrow('timed out after 100ms'); + + // Restore the original implementation for other tests + sendSpy.mockRestore(); + }); + + test('should queue sendReceive requests when reconnecting and process them after', (done) => { + webSocketChannel = new WebSocketChannel({ + nodeUrl: NODE_URL, + reconnectOptions: { retries: 3, delay: 100 }, + }); + + let hasReconnected = false; + webSocketChannel.on('open', () => { + if (hasReconnected) { + // Reconnected. The promise from the queued sendReceive will resolve now. + } else { + // 1. First connection, now simulate a drop + hasReconnected = true; + webSocketChannel.websocket.close(); + + // 2. Immediately try to send a request. It should be queued. + webSocketChannel.sendReceive('starknet_chainId').then((result) => { + // 3. This assertion runs after reconnection, proving the queue was processed. + expect(result).toBe(StarknetChainId.SN_SEPOLIA); + done(); // 4. Test is done when the queued request has been successfully processed. + }); + } + }); + }); + + test('should queue subscribe requests when reconnecting and process them after', (done) => { + jest.setTimeout(30000); // Allow time for reconnect and a new block event + + webSocketChannel = new WebSocketChannel({ + nodeUrl: NODE_URL, + reconnectOptions: { retries: 3, delay: 100 }, + }); + + let hasReconnected = false; + webSocketChannel.on('open', () => { + if (hasReconnected) { + // Reconnected. The promise from the queued subscribeNewHeads will resolve now. + } else { + // 1. First connection, now simulate a drop + hasReconnected = true; + webSocketChannel.websocket.close(); + + // 2. Immediately try to subscribe. The request should be queued. + webSocketChannel.subscribeNewHeads().then((sub) => { + // 3. This should only execute after reconnection. + expect(sub).toBeInstanceOf(Subscription); + expect(webSocketChannel.isConnected()).toBe(true); + + // 4. To prove it's a real subscription, wait for one event. + sub.on((data) => { + expect(data).toBeDefined(); + done(); + }); + }); + } + }); + }); + + test('should restore active subscriptions after an automatic reconnection', (done) => { + jest.setTimeout(30000); // Allow time for reconnect and new block + + webSocketChannel = new WebSocketChannel({ + nodeUrl: NODE_URL, + reconnectOptions: { retries: 3, delay: 100 }, + }); + + let connectionCount = 0; + + const eventHandler = (data: any) => { + // The handler is called. If this is after the reconnection (connectionCount > 1), + // it proves the subscription was successfully restored. + if (connectionCount > 1) { + expect(data).toBeDefined(); + done(); + } + }; + + webSocketChannel.on('open', async () => { + connectionCount += 1; + if (connectionCount === 1) { + // First connection: set up the subscription + const sub = await webSocketChannel.subscribeNewHeads(); + sub.on(eventHandler); + // Now, simulate a drop + webSocketChannel.websocket.close(); + } + // On the second 'open' event (connectionCount === 2), the test will implicitly + // be waiting for the eventHandler to be called, which will resolve the test. + }); + }); }); +}); - test('Test subscribePendingTransaction', async () => { - await webSocketChannel.subscribePendingTransaction(true); - - let i = 0; - webSocketChannel.onPendingTransaction = async (data) => { - i += 1; - // TODO : Add data format validation - expect(data.result).toBeDefined(); - if (i === 5) { - const status = await webSocketChannel.unsubscribePendingTransaction(); - expect(status).toBe(true); - } - }; - const expectedId = webSocketChannel.subscriptions.get(WSSubscriptions.PENDING_TRANSACTION); - const subscriptionId = await webSocketChannel.waitForUnsubscription(expectedId); - expect(subscriptionId).toBe(expectedId); - expect(webSocketChannel.subscriptions.get(WSSubscriptions.PENDING_TRANSACTION)).toBe(undefined); +describe('Unit Test: WebSocketChannel Buffering', () => { + let webSocketChannel: WebSocketChannel; + let sub: Subscription; + + afterEach(async () => { + if (sub && !sub.isClosed) { + await sub.unsubscribe(); + } + if (webSocketChannel && webSocketChannel.isConnected()) { + webSocketChannel.disconnect(); + await webSocketChannel.waitForDisconnection(); + } }); - test('Test subscribeTransactionStatus', async () => { - const { transaction_hash } = await account.execute({ - contractAddress: STRKtokenAddress, - entrypoint: 'transfer', - calldata: [account.address, '10', '0'], - }); - - let i = 0; - webSocketChannel.onTransactionStatus = async (data) => { - i += 1; - // TODO : Add data format validation - expect(data.result).toBeDefined(); - if (i >= 1) { - const status = await webSocketChannel.unsubscribeTransactionStatus(); - expect(status).toBe(true); - } - }; - - const subid = await webSocketChannel.subscribeTransactionStatus(transaction_hash); - expect(subid).toEqual(expect.any(Number)); - const expectedId = webSocketChannel.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS); - const subscriptionId = await webSocketChannel.waitForUnsubscription(expectedId); - expect(subscriptionId).toEqual(expectedId); - expect(webSocketChannel.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS)).toBe(undefined); + test('should buffer events and process upon handler attachment', async () => { + // This test is for client-side buffering, so we don't need a real connection. + webSocketChannel = new WebSocketChannel({ + nodeUrl: 'ws://dummy-url', + autoReconnect: false, + }); + // Mock unsubscribe to prevent network errors during cleanup in afterEach. + jest.spyOn(webSocketChannel, 'unsubscribe').mockResolvedValue(true); + + // Manually create the subscription, bypassing the network. + const subId = 'mock_sub_id_buffer'; + sub = new Subscription(webSocketChannel, 'starknet_subscribeNewHeads', {}, subId, 1000); + (webSocketChannel as any).activeSubscriptions.set(subId, sub); + + const mockNewHeadsResult1 = { block_number: 1 }; + const mockNewHeadsResult2 = { block_number: 2 }; + + // 1. Simulate receiving an event BEFORE a handler is attached. + sub._handleEvent(mockNewHeadsResult1); + + const handler = jest.fn(); + + // 2. Attach handler, which should immediately process the buffer. + sub.on(handler); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith(mockNewHeadsResult1); + + // 3. Simulate another event, which should be processed directly. + sub._handleEvent(mockNewHeadsResult2); + + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith(mockNewHeadsResult2); }); - test('Test subscribeTransactionStatus and block_id', async () => { - const { transaction_hash } = await account.execute({ - contractAddress: STRKtokenAddress, - entrypoint: 'transfer', - calldata: [account.address, '10', '0'], - }); - - let i = 0; - webSocketChannel.onTransactionStatus = async (data) => { - i += 1; - // TODO : Add data format validation - expect(data.result).toBeDefined(); - if (i >= 1) { - const status = await webSocketChannel.unsubscribeTransactionStatus(); - expect(status).toBe(true); - } - }; - - const subid = await webSocketChannel.subscribeTransactionStatus(transaction_hash); - expect(subid).toEqual(expect.any(Number)); - const expectedId = webSocketChannel.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS); - const subscriptionId = await webSocketChannel.waitForUnsubscription(expectedId); - expect(subscriptionId).toEqual(expectedId); - expect(webSocketChannel.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS)).toBe(undefined); + test('should drop oldest events when buffer limit is reached', async () => { + // No real connection needed for this test. + webSocketChannel = new WebSocketChannel({ + nodeUrl: 'ws://dummy-url', + maxBufferSize: 2, + autoReconnect: false, + }); + jest.spyOn(webSocketChannel, 'unsubscribe').mockResolvedValue(true); + + // Manually create subscription with a buffer size of 2. + const subId = 'mock_sub_id_drop'; + sub = new Subscription(webSocketChannel, 'starknet_subscribeNewHeads', {}, subId, 2); + (webSocketChannel as any).activeSubscriptions.set(subId, sub); + + const warnSpy = jest.spyOn(logger, 'warn').mockImplementation(() => {}); + + // Simulate 3 events to overflow the buffer. + sub._handleEvent({ block_number: 1 }); + sub._handleEvent({ block_number: 2 }); + sub._handleEvent({ block_number: 3 }); // This one should cause the oldest to be dropped. + + expect(warnSpy).toHaveBeenCalledTimes(1); + + const handler = jest.fn(); + sub.on(handler); + + // The handler should be called with the two most recent events. + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith({ block_number: 2 }); + expect(handler).toHaveBeenCalledWith({ block_number: 3 }); + expect(handler).not.toHaveBeenCalledWith({ block_number: 1 }); // The first event was dropped. + + warnSpy.mockRestore(); }); }); -describe('websocket regular endpoints - pathfinder test', () => { - let webSocketChannel: WebSocketChannel; +describe('Unit Test: Subscription Class', () => { + let mockChannel: WebSocketChannel; + let subscription: Subscription; + + beforeEach(() => { + // Create a mock WebSocketChannel. We don't need a real one for these tests. + mockChannel = new WebSocketChannel({ nodeUrl: 'ws://dummy-url' }); + // Mock the parts of the channel that the subscription interacts with. + mockChannel.unsubscribe = jest.fn().mockResolvedValue(true); + mockChannel.removeSubscription = jest.fn(); - beforeAll(async () => { - webSocketChannel = new WebSocketChannel({ nodeUrl }); - expect(webSocketChannel.isConnected()).toBe(false); - const status = await webSocketChannel.waitForConnection(); - expect(status).toBe(WebSocket.OPEN); + subscription = new Subscription(mockChannel, 'test_method', {}, 'sub_123', 100); }); - afterAll(async () => { - expect(webSocketChannel.isConnected()).toBe(true); - webSocketChannel.disconnect(); + test('should throw an error if .on() is called more than once', () => { + const handler1 = jest.fn(); + const handler2 = jest.fn(); + + subscription.on(handler1); // First call is fine. + + // Second call should throw. + expect(() => { + subscription.on(handler2); + }).toThrow('A handler is already attached to this subscription.'); }); - test('regular rpc endpoint', async () => { - const response = await webSocketChannel.sendReceiveAny('starknet_chainId'); - expect(response).toBe(StarknetChainId.SN_SEPOLIA); + test('unsubscribe should be idempotent and only call the channel once', async () => { + // Call unsubscribe multiple times. + const result1 = await subscription.unsubscribe(); + const result2 = await subscription.unsubscribe(); + const result3 = await subscription.unsubscribe(); + + // All calls should report success. + expect(result1).toBe(true); + expect(result2).toBe(true); + expect(result3).toBe(true); + + // But the channel's unsubscribe method should only have been called once. + expect(mockChannel.unsubscribe).toHaveBeenCalledTimes(1); + expect(mockChannel.unsubscribe).toHaveBeenCalledWith('sub_123'); + + // And the subscription should be removed from the channel once. + expect(mockChannel.removeSubscription).toHaveBeenCalledTimes(1); + expect(mockChannel.removeSubscription).toHaveBeenCalledWith('sub_123'); }); }); diff --git a/__tests__/config/fixtures.ts b/__tests__/config/fixtures.ts index 430e97605..7c07dab72 100644 --- a/__tests__/config/fixtures.ts +++ b/__tests__/config/fixtures.ts @@ -81,7 +81,9 @@ const compiledContracts = { }; export const contracts = mapContractSets(compiledContracts); -config.set('logLevel', 'DEBUG'); +if (process.env.DEBUG) { + config.set('logLevel', 'DEBUG'); +} export function getTestProvider( isProvider?: true, diff --git a/__tests__/config/helpers/strategyResolver.ts b/__tests__/config/helpers/strategyResolver.ts index ededf6c9e..d2bfc3772 100644 --- a/__tests__/config/helpers/strategyResolver.ts +++ b/__tests__/config/helpers/strategyResolver.ts @@ -89,7 +89,6 @@ class StrategyResolver { TEST_RPC_URL: process.env.TEST_RPC_URL, TEST_WS_URL: process.env.TEST_WS_URL, TX_VERSION: process.env.TX_VERSION, - SPEC_VERSION: process.env.SPEC_VERSION, }); console.table({ @@ -97,6 +96,7 @@ class StrategyResolver { IS_RPC: process.env.IS_RPC, IS_TESTNET: process.env.IS_TESTNET, 'Detected Spec Version': process.env.RPC_SPEC_VERSION, + DEBUG: process.env.DEBUG, }); console.log('Global Test Environment is Ready'); @@ -131,25 +131,26 @@ class StrategyResolver { console.log('Global Test Setup Started'); this.verifyAccountData(); - if (this.hasAllAccountEnvs) { - await this.useProvidedSetup(); - return; - } + if (!this.hasAllAccountEnvs) { + // 2. Try to detect devnet setup + console.log('Basic test parameters are missing, Auto Setup Started'); - // 2. Try to detect devnet setup - console.log('Basic test parameters are missing, Auto Setup Started'); + await this.detectDevnet(); + await accountResolver.execute(this.isDevnet); - await this.detectDevnet(); - await this.resolveRpc(); - await accountResolver.execute(this.isDevnet); - - this.verifyAccountData(true); - if (!this.hasAllAccountEnvs) console.error('Test Setup Environment is NOT Ready'); + this.verifyAccountData(true); + if (!this.hasAllAccountEnvs) console.error('Test Setup Environment is NOT Ready'); + } + await this.resolveRpc(); this.defineTestTransactionVersion(); await this.getNodeSpecVersion(); - this.logConfigInfo(); + if (this.hasAllAccountEnvs) { + await this.useProvidedSetup(); + } else { + this.logConfigInfo(); + } } } diff --git a/__tests__/config/jestGlobalSetup.ts b/__tests__/config/jestGlobalSetup.ts index c804681c7..aa30b86b1 100644 --- a/__tests__/config/jestGlobalSetup.ts +++ b/__tests__/config/jestGlobalSetup.ts @@ -5,6 +5,7 @@ * ref: order of execution jestGlobalSetup.ts -> jest.setup.ts -> fixtures.ts */ +import 'dotenv/config'; import strategyResolver from './helpers/strategyResolver'; /** diff --git a/example.env b/example.env new file mode 100644 index 000000000..f335c42d9 --- /dev/null +++ b/example.env @@ -0,0 +1,13 @@ +# Test Setup 1. +# TEST_ACCOUNT_ADDRESS= +# TEST_ACCOUNT_PRIVATE_KEY= +# TEST_RPC_URL= +# TEST_WS_URL= +# DEBUG=true + + +# Test Setup 2. +TEST_ACCOUNT_ADDRESS= +TEST_ACCOUNT_PRIVATE_KEY= +TEST_RPC_URL= +TEST_WS_URL= \ No newline at end of file diff --git a/jest.config.js b/jest.config.js new file mode 100644 index 000000000..7dfe3074d --- /dev/null +++ b/jest.config.js @@ -0,0 +1,12 @@ +module.exports = { + verbose: true, + modulePathIgnorePatterns: ['dist'], + setupFilesAfterEnv: ['./__tests__/config/jest.setup.ts'], + snapshotFormat: { + escapeString: true, + printBasicPrototype: true, + }, + testMatch: ['**/__tests__/**/(*.)+(spec|test).[jt]s?(x)'], + globalSetup: './__tests__/config/jestGlobalSetup.ts', + sandboxInjectedGlobals: ['Math'], +}; diff --git a/package-lock.json b/package-lock.json index 938d790c4..f8441405c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -40,6 +40,8 @@ "@typescript-eslint/parser": "^7.4.0", "ajv": "^8.12.0", "ajv-keywords": "^5.1.0", + "dotenv": "^16.5.0", + "dotenv-cli": "^8.0.0", "eslint": "^8.56.0", "eslint-config-airbnb-base": "^15.0.0", "eslint-config-airbnb-typescript": "^18.0.0", @@ -7311,6 +7313,45 @@ "node": ">=8" } }, + "node_modules/dotenv": { + "version": "16.5.0", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.5.0.tgz", + "integrity": "sha512-m/C+AwOAr9/W1UOIZUo232ejMNnJAJtYQjUbHoNTBNTJSvqzzDh7vnrei3o3r3m9blf6ZoDkvcw0VmozNRFJxg==", + "dev": true, + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, + "node_modules/dotenv-cli": { + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/dotenv-cli/-/dotenv-cli-8.0.0.tgz", + "integrity": "sha512-aLqYbK7xKOiTMIRf1lDPbI+Y+Ip/wo5k3eyp6ePysVaSqbyxjyK3dK35BTxG+rmd7djf5q2UPs4noPNH+cj0Qw==", + "dev": true, + "license": "MIT", + "dependencies": { + "cross-spawn": "^7.0.6", + "dotenv": "^16.3.0", + "dotenv-expand": "^10.0.0", + "minimist": "^1.2.6" + }, + "bin": { + "dotenv": "cli.js" + } + }, + "node_modules/dotenv-expand": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/dotenv-expand/-/dotenv-expand-10.0.0.tgz", + "integrity": "sha512-GopVGCpVS1UKH75VKHGuQFqS1Gusej0z4FyQkPdwjil2gNIv+LNsqBlboOzpJFZKVT95GkCyWJbBSdFEFUWI2A==", + "dev": true, + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + } + }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", diff --git a/package.json b/package.json index e93863863..bd17d981d 100644 --- a/package.json +++ b/package.json @@ -31,10 +31,10 @@ "build:iife": "tsup --clean false --format iife --platform browser", "build:dts": "tsup --clean false --dts-only", "pretest": "npm run lint && npm run ts:check", - "test": "jest -i", + "test": "jest -i --detectOpenHandles", "test:coverage": "jest -i --coverage", - "posttest": "npm run format -- --log-level warn", "test:watch": "jest --watch", + "posttest": "npm run format -- --log-level warn", "docs": "cd www && npm run start", "docs:build": "cd www && GIT_REVISION_OVERRIDE=${npm_config_git_revision_override} npm run build", "docs:build:version": "v=$(npm run info:version -s) && npm run docs:build --git-revision-override=${npm_config_git_revision_override=v$v}", @@ -44,7 +44,10 @@ "lint": "eslint . --cache --fix --ext .ts", "ts:check": "tsc --noEmit --resolveJsonModule --project tsconfig.eslint.json", "ts:coverage": "type-coverage --at-least 95", - "ts:coverage:report": "typescript-coverage-report" + "ts:coverage:report": "typescript-coverage-report", + "test:acc1": "dotenv -e .env.account1 -- npm test", + "test:acc2": "dotenv -e .env.account2 -- npm test", + "test:all-accounts": "npm run test:acc1 && npm run test:acc2" }, "keywords": [ "starknet", @@ -74,6 +77,8 @@ "@typescript-eslint/parser": "^7.4.0", "ajv": "^8.12.0", "ajv-keywords": "^5.1.0", + "dotenv": "^16.5.0", + "dotenv-cli": "^8.0.0", "eslint": "^8.56.0", "eslint-config-airbnb-base": "^15.0.0", "eslint-config-airbnb-typescript": "^18.0.0", @@ -102,11 +107,11 @@ "@noble/hashes": "1.6.0", "@scure/base": "1.2.1", "@scure/starknet": "1.1.0", + "@starknet-io/starknet-types-07": "npm:@starknet-io/types-js@~0.7.10", + "@starknet-io/starknet-types-08": "npm:@starknet-io/types-js@~0.8.4", "abi-wan-kanabi": "2.2.4", "lossless-json": "^4.0.1", "pako": "^2.0.4", - "@starknet-io/starknet-types-07": "npm:@starknet-io/types-js@~0.7.10", - "@starknet-io/starknet-types-08": "npm:@starknet-io/types-js@~0.8.4", "ts-mixer": "^6.0.3" }, "engines": { @@ -116,22 +121,6 @@ "*.ts": "eslint --cache --fix", "*.{ts,js,md,yml,json}": "prettier --write" }, - "jest": { - "snapshotFormat": { - "escapeString": true, - "printBasicPrototype": true - }, - "testMatch": [ - "**/__tests__/**/(*.)+(spec|test).[jt]s?(x)" - ], - "setupFilesAfterEnv": [ - "./__tests__/config/jest.setup.ts" - ], - "globalSetup": "./__tests__/config/jestGlobalSetup.ts", - "sandboxInjectedGlobals": [ - "Math" - ] - }, "importSort": { ".js, .jsx, .ts, .tsx": { "style": "module", diff --git a/src/channel/index.ts b/src/channel/index.ts index c3ea655a6..9033e0cde 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -2,4 +2,6 @@ export * as RPC07 from './rpc_0_7_1'; export * as RPC08 from './rpc_0_8_1'; // Default channel export * from './rpc_0_8_1'; -export * from './ws_0_8'; +export { WebSocketChannel, WebSocketOptions } from './ws/ws_0_8'; +export { Subscription } from './ws/subscription'; +export { TimeoutError, WebSocketNotConnectedError } from '../utils/errors'; diff --git a/src/channel/ws/subscription.ts b/src/channel/ws/subscription.ts new file mode 100644 index 000000000..c435dc486 --- /dev/null +++ b/src/channel/ws/subscription.ts @@ -0,0 +1,162 @@ +/* eslint-disable no-underscore-dangle */ +import type { SUBSCRIPTION_ID } from '@starknet-io/starknet-types-08'; +import { logger } from '../../global/logger'; +import type { WebSocketChannel } from './ws_0_8'; +import { EventEmitter } from '../../utils/eventEmitter'; + +type SubscriptionEvents = { + event: T; + error: Error; + unsubscribe: void; +}; + +/** + * Represents an active WebSocket subscription. + * + * This class should not be instantiated directly. It is returned by the + * `subscribe` methods on the `WebSocketChannel`. + * + * @template T - The type of data expected from the subscription event. + * @example + * ```typescript + * const channel = new WebSocketChannel({ nodeUrl: 'YOUR_NODE_URL' }); + * await channel.waitForConnection(); + * + * // The 'sub' object is an instance of the Subscription class. + * const sub = await channel.subscribeNewHeads(); + * + * sub.on((data) => { + * console.log('Received new head:', data); + * }); + * + * // ... later + * await sub.unsubscribe(); + * ``` + */ +export class Subscription { + /** + * The containing `WebSocketChannel` instance. + * @internal + */ + public channel: WebSocketChannel; + + /** + * The JSON-RPC method used to create this subscription. + * @internal + */ + public method: string; + + /** + * The parameters used to create this subscription. + * @internal + */ + public params: any; + + /** + * The unique identifier for this subscription. + * @internal + */ + public id: SUBSCRIPTION_ID; + + private events = new EventEmitter>(); + + private buffer: T[] = []; + + private maxBufferSize: number; + + private handler: ((data: T) => void) | null = null; + + private _isClosed = false; + + /** + * @internal + * @param {WebSocketChannel} channel - The WebSocketChannel instance. + * @param {string} method - The RPC method used for the subscription. + * @param {any} params - The parameters for the subscription. + * @param {SUBSCRIPTION_ID} id - The subscription ID. + * @param {number} maxBufferSize - The maximum number of events to buffer. + */ + constructor( + channel: WebSocketChannel, + method: string, + params: object, + id: SUBSCRIPTION_ID, + maxBufferSize: number + ) { + this.channel = channel; + this.method = method; + this.params = params; + this.id = id; + this.maxBufferSize = maxBufferSize; + } + + /** + * Indicates if the subscription has been closed. + * @returns {boolean} `true` if unsubscribed, `false` otherwise. + */ + public get isClosed(): boolean { + return this._isClosed; + } + + /** + * Internal method to handle incoming events from the WebSocket channel. + * If a handler is attached, it's invoked immediately. Otherwise, the event is buffered. + * @internal + * @param {T} data - The event data. + */ + public _handleEvent(data: T): void { + if (this.handler) { + this.handler(data); + } else { + if (this.buffer.length >= this.maxBufferSize) { + const droppedEvent = this.buffer.shift(); // Drop the oldest event. + logger.warn(`Subscription ${this.id}: Buffer full. Dropping oldest event:`, droppedEvent); + } + this.buffer.push(data); + } + } + + /** + * Attaches a handler function to be called for each event. + * + * When a handler is attached, any buffered events will be passed to it sequentially. + * Subsequent events will be passed directly as they arrive. + * + * @param {(data: T) => void} handler - The function to call with event data. + * @throws {Error} If a handler is already attached to this subscription. + */ + public on(handler: (data: T) => void): void { + if (this.handler) { + // To avoid complexity, we only allow one handler at a time. + // Users can implement their own multi-handler logic if needed. + throw new Error('A handler is already attached to this subscription.'); + } + this.handler = handler; + + // Process the buffer. + while (this.buffer.length > 0) { + const event = this.buffer.shift(); + if (event) { + this.handler(event); + } + } + } + + /** + * Sends an unsubscribe request to the node and cleans up local resources. + * @returns {Promise} A Promise that resolves to `true` if the unsubscription was successful. + */ + public async unsubscribe(): Promise { + if (this._isClosed) { + return true; // Already unsubscribed, treat as success. + } + const success = await this.channel.unsubscribe(this.id); + if (success) { + this._isClosed = true; + this.channel.removeSubscription(this.id); + this.events.emit('unsubscribe', undefined); + this.events.clear(); // Clean up all listeners. + } + return success; + } +} diff --git a/src/channel/ws/ws_0_8.ts b/src/channel/ws/ws_0_8.ts new file mode 100644 index 000000000..7e9dee8b7 --- /dev/null +++ b/src/channel/ws/ws_0_8.ts @@ -0,0 +1,656 @@ +/* eslint-disable no-underscore-dangle */ +import type { + BLOCK_HEADER, + EMITTED_EVENT, + NEW_TXN_STATUS, + SUBSCRIPTION_ID, + TXN_HASH, + TXN_WITH_HASH, +} from '@starknet-io/starknet-types-08'; + +import { BigNumberish, SubscriptionBlockIdentifier } from '../../types'; +import { JRPC } from '../../types/api'; +import { WebSocketEvent } from '../../types/api/jsonrpc'; +import { EventEmitter } from '../../utils/eventEmitter'; +import { TimeoutError, WebSocketNotConnectedError } from '../../utils/errors'; +import WebSocket from '../../utils/connect/ws'; +import { stringify } from '../../utils/json'; +import { isString, isObject } from '../../utils/typed'; +import { bigNumberishArrayToHexadecimalStringArray, toHex } from '../../utils/num'; +import { Block } from '../../utils/provider'; +import { config } from '../../global/config'; +import { logger } from '../../global/logger'; +import { Subscription } from './subscription'; + +/** + * Options for configuring the automatic reconnection behavior of the WebSocketChannel. + */ +export type ReconnectOptions = { + /** + * The number of retries to attempt before giving up. + * @default 5 + */ + retries?: number; + /** + * The initial delay in milliseconds before the first retry. + * This delay will be doubled for each subsequent retry (exponential backoff). + * @default 2000 + */ + delay?: number; +}; + +/** + * Options for configuring the WebSocketChannel. + */ +export type WebSocketOptions = { + /** + * The URL of the WebSocket endpoint of the Starknet node. + * @example 'ws://localhost:9545' + */ + nodeUrl: string; + /** + * This parameter can be used to provide a custom WebSocket implementation. + * This is useful in environments where the global WebSocket object is not available (e.g., Node.js). + * @example + * ```typescript + * import WebSocket from 'ws'; + * const channel = new WebSocketChannel({ nodeUrl: '...', websocket: WebSocket }); + * ``` + */ + websocket?: typeof WebSocket; + /** + * The maximum number of events to buffer per subscription when no handler is attached. + * @default 1000 + */ + maxBufferSize?: number; + /** + * Whether to automatically reconnect when the connection is lost. + * @default true + */ + autoReconnect?: boolean; + /** + * Options for the automatic reconnection behavior. + */ + reconnectOptions?: ReconnectOptions; + /** + * The timeout in milliseconds for a `sendReceive` call. + * @default 60000 + */ + requestTimeout?: number; +}; + +type WebSocketChannelEvents = { + open: Event; + close: CloseEvent; + message: MessageEvent; + error: Event; + unsubscribe: SUBSCRIPTION_ID; +}; + +/** + * Manages a WebSocket connection to a Starknet node for receiving real-time updates. + * This class handles subscriptions, automatic reconnection, and request queueing. + * + * @example + * ```typescript + * const channel = new WebSocketChannel({ nodeUrl: 'YOUR_NODE_URL' }); + * await channel.waitForConnection(); + * + * const sub = await channel.subscribeNewHeads(); + * sub.on((data) => { + * console.log('New Block:', data); + * }); + * + * // ... later + * await sub.unsubscribe(); + * channel.disconnect(); + * ``` + */ +export class WebSocketChannel { + /** + * The URL of the WebSocket RPC Node. + * @example 'wss://starknet-sepolia.public.blastapi.io/rpc/v0_8' + */ + public nodeUrl: string; + + /** + * The underlying WebSocket instance. + */ + public websocket: WebSocket; + + // Store the WebSocket implementation class to allow for custom implementations. + private WsImplementation: typeof WebSocket; + + // Map of active subscriptions, keyed by their ID. + private activeSubscriptions: Map> = new Map(); + + private readonly maxBufferSize: number; + + private readonly autoReconnect: boolean; + + private readonly reconnectOptions: Required; + + private readonly requestTimeout: number; + + private isReconnecting = false; + + private reconnectAttempts = 0; + + private userInitiatedClose = false; + + private reconnectTimeoutId: NodeJS.Timeout | null = null; + + private requestQueue: Array<{ + method: string; + params?: object; + resolve: (value: any) => void; + reject: (reason?: any) => void; + }> = []; + + private events = new EventEmitter(); + + private openListener = (ev: Event) => this.events.emit('open', ev); + + private closeListener = this.onCloseProxy.bind(this); + + private messageListener = this.onMessageProxy.bind(this); + + private errorListener = (ev: Event) => this.events.emit('error', ev); + + /** + * JSON RPC latest sent message ID. + * The receiving message is expected to contain the same ID. + */ + private sendId: number = 0; + + /** + * Creates an instance of WebSocketChannel. + * @param {WebSocketOptions} options - The options for configuring the channel. + */ + constructor(options: WebSocketOptions) { + this.nodeUrl = options.nodeUrl; + this.maxBufferSize = options.maxBufferSize ?? 1000; + this.autoReconnect = options.autoReconnect ?? true; + this.reconnectOptions = { + retries: options.reconnectOptions?.retries ?? 5, + delay: options.reconnectOptions?.delay ?? 2000, + }; + this.requestTimeout = options.requestTimeout ?? 60000; + + this.WsImplementation = options.websocket || config.get('websocket') || WebSocket; + this.websocket = new this.WsImplementation(this.nodeUrl); + + this.websocket.addEventListener('open', this.openListener); + this.websocket.addEventListener('close', this.closeListener); + this.websocket.addEventListener('message', this.messageListener); + this.websocket.addEventListener('error', this.errorListener); + } + + private idResolver(id?: number) { + // An unmanaged, user-set ID. + if (id) return id; + // Managed ID, intentionally returned old and then incremented. + // eslint-disable-next-line no-plusplus + return this.sendId++; + } + + /** + * Sends a JSON-RPC request over the WebSocket connection without waiting for a response. + * This is a low-level method. Prefer `sendReceive` for most use cases. + * @param {string} method - The RPC method name. + * @param {object} [params] - The parameters for the RPC method. + * @param {number} [id] - A specific request ID. If not provided, an auto-incrementing ID is used. + * @returns {number} The ID of the sent request. + * @throws {WebSocketNotConnectedError} If the WebSocket is not connected. + */ + public send(method: string, params?: object, id?: number) { + if (!this.isConnected()) { + throw new WebSocketNotConnectedError( + 'WebSocketChannel.send() failed due to socket being disconnected' + ); + } + const usedId = this.idResolver(id); + const rpcRequestBody: JRPC.RequestBody = { + id: usedId, + jsonrpc: '2.0', + method, + ...(params && { params }), + }; + // Stringify should remove undefined params + this.websocket.send(stringify(rpcRequestBody)); + return usedId; + } + + /** + * Sends a JSON-RPC request and returns a Promise that resolves with the result. + * This method abstracts the request/response cycle over WebSockets. + * If the connection is lost, it will queue the request and send it upon reconnection. + * @template T - The expected type of the result. + * @param {string} method - The RPC method name. + * @param {object} [params] - The parameters for the RPC method. + * @returns {Promise} A Promise that resolves with the RPC response result. + * @throws {TimeoutError} If the request does not receive a response within the configured `requestTimeout`. + * @throws {WebSocketNotConnectedError} If the WebSocket is not connected and auto-reconnect is disabled. + */ + public sendReceive(method: string, params?: object): Promise { + // If we are in the process of reconnecting, or if we are disconnected but expect to reconnect, queue the request. + if ( + this.isReconnecting || + (!this.isConnected() && this.autoReconnect && !this.userInitiatedClose) + ) { + logger.info(`WebSocket: Connection unavailable, queueing request: ${method}`); + return new Promise((resolve, reject) => { + this.requestQueue.push({ method, params, resolve, reject }); + }); + } + + const sendId = this.send(method, params); + + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout; + + if (!this.websocket || this.websocket.readyState !== WebSocket.OPEN) { + reject(new WebSocketNotConnectedError('WebSocket not available or not connected.')); + return; + } + + const messageHandler = (event: MessageEvent) => { + if (!isString(event.data)) { + logger.warn('WebSocket received non-string message data:', event.data); + return; + } + const message: JRPC.ResponseBody = JSON.parse(event.data); + if (message.id === sendId) { + clearTimeout(timeoutId); + this.websocket.removeEventListener('message', messageHandler); + // eslint-disable-next-line @typescript-eslint/no-use-before-define + this.websocket.removeEventListener('error', errorHandler); + + if ('result' in message) { + resolve(message.result as T); + } else { + reject( + new Error(`Error on ${method} (id: ${sendId}): ${JSON.stringify(message.error)}`) + ); + } + } + }; + + const errorHandler = (event: Event) => { + clearTimeout(timeoutId); + this.websocket.removeEventListener('message', messageHandler); + this.websocket.removeEventListener('error', errorHandler); + reject( + new Error( + `WebSocket error during ${method} (id: ${sendId}): ${event.type || 'Unknown error'}` + ) + ); + }; + + this.websocket.addEventListener('message', messageHandler); + this.websocket.addEventListener('error', errorHandler); + + timeoutId = setTimeout(() => { + // Clean up listeners + this.websocket.removeEventListener('message', messageHandler); + this.websocket.removeEventListener('error', errorHandler); + reject( + new TimeoutError( + `Request ${method} (id: ${sendId}) timed out after ${this.requestTimeout}ms` + ) + ); + }, this.requestTimeout); + }); + } + + /** + * Checks if the WebSocket connection is currently open. + * @returns {boolean} `true` if the connection is open, `false` otherwise. + */ + public isConnected() { + return this.websocket.readyState === WebSocket.OPEN; + } + + /** + * Returns a Promise that resolves when the WebSocket connection is open. + * Can be used to block execution until the connection is established. + * @returns {Promise} A Promise that resolves with the WebSocket's `readyState` when connected. + * @example + * ```typescript + * const channel = new WebSocketChannel({ nodeUrl: '...' }); + * await channel.waitForConnection(); + * console.log('Connected!'); + * ``` + */ + public async waitForConnection(): Promise { + // Wait for the websocket to connect + if (this.websocket.readyState !== WebSocket.OPEN) { + return new Promise((resolve, reject) => { + if (!this.websocket) return; + this.websocket.onopen = () => resolve(this.websocket.readyState); + this.websocket.onerror = (error) => { + return reject(error); + }; + }); + } + + return this.websocket.readyState; + } + + /** + * Closes the WebSocket connection. + * This method is user-initiated and will prevent automatic reconnection for this closure. + * @param {number} [code] - The WebSocket connection close code. + * @param {string} [reason] - The WebSocket connection close reason. + */ + public disconnect(code?: number, reason?: string) { + if (this.reconnectTimeoutId) { + clearTimeout(this.reconnectTimeoutId); + this.reconnectTimeoutId = null; + } + this.websocket.close(code, reason); + this.userInitiatedClose = true; + } + + /** + * Returns a Promise that resolves when the WebSocket connection is closed. + * @returns {Promise} A Promise that resolves with the WebSocket's `readyState` or a `CloseEvent` when disconnected. + */ + public async waitForDisconnection(): Promise { + // Wait for the websocket to disconnect + if (this.websocket.readyState !== WebSocket.CLOSED) { + return new Promise((resolve, reject) => { + if (!this.websocket) return; + this.websocket.onclose = () => resolve(this.websocket.readyState); + this.websocket.onerror = reject; + }); + } + + return this.websocket.readyState; + } + + /** + * Unsubscribes from a Starknet subscription. + * It is recommended to use the `unsubscribe()` method on the `Subscription` object instead. + * @internal + * @param {SUBSCRIPTION_ID} subscriptionId - The ID of the subscription to unsubscribe from. + * @returns {Promise} A Promise that resolves with `true` if the unsubscription was successful. + */ + public async unsubscribe(subscriptionId: SUBSCRIPTION_ID) { + const status = await this.sendReceive('starknet_unsubscribe', { + subscription_id: subscriptionId, + }); + if (status) { + this.events.emit('unsubscribe', subscriptionId); + } + return status; + } + + /** + * Returns a Promise that resolves when a specific subscription is successfully unsubscribed. + * @param {SUBSCRIPTION_ID} targetId - The ID of the subscription to wait for. + * @returns {Promise} + * @example + * ```typescript + * await channel.waitForUnsubscription(mySubscription.id); + * console.log('Successfully unsubscribed.'); + * ``` + */ + public waitForUnsubscription(targetId: SUBSCRIPTION_ID): Promise { + return new Promise((resolve) => { + const listener = (unsubId: SUBSCRIPTION_ID) => { + if (unsubId === targetId) { + this.events.off('unsubscribe', listener); + resolve(); + } + }; + this.events.on('unsubscribe', listener); + }); + } + + /** + * Manually initiates a reconnection attempt. + * This creates a new WebSocket instance and re-establishes listeners. + */ + public reconnect() { + this.userInitiatedClose = false; + this.websocket = new this.WsImplementation(this.nodeUrl); + + this.websocket.addEventListener('open', this.openListener); + this.websocket.addEventListener('close', this.closeListener); + this.websocket.addEventListener('message', this.messageListener); + this.websocket.addEventListener('error', this.errorListener); + } + + private _processRequestQueue(): void { + logger.info(`WebSocket: Processing ${this.requestQueue.length} queued requests.`); + while (this.requestQueue.length > 0) { + const { method, params, resolve, reject } = this.requestQueue.shift()!; + this.sendReceive(method, params).then(resolve).catch(reject); + } + } + + private async _restoreSubscriptions(): Promise { + const oldSubscriptions = Array.from(this.activeSubscriptions.values()); + this.activeSubscriptions.clear(); + + const restorePromises = oldSubscriptions.map(async (sub) => { + try { + const newSubId = await this.sendReceive(sub.method, sub.params); + // eslint-disable-next-line no-param-reassign + sub.id = newSubId; // Update the subscription with the new ID + this.activeSubscriptions.set(newSubId, sub); + logger.info(`Subscription ${sub.method} restored with new ID: ${newSubId}`); + } catch (error) { + logger.error(`Failed to restore subscription ${sub.method}:`, error); + // The subscription is not added back to activeSubscriptions if it fails + } + }); + + await Promise.all(restorePromises); + } + + private _startReconnect() { + if (this.isReconnecting || !this.autoReconnect) { + return; + } + + this.isReconnecting = true; + this.reconnectAttempts = 0; + + const tryReconnect = () => { + if (this.reconnectAttempts >= this.reconnectOptions.retries) { + logger.error('WebSocket: Maximum reconnection retries reached. Giving up.'); + this.isReconnecting = false; + return; + } + + this.reconnectAttempts += 1; + logger.info( + `WebSocket: Connection lost. Attempting to reconnect... (${this.reconnectAttempts}/${this.reconnectOptions.retries})` + ); + + this.reconnect(); // Attempt to reconnect + + this.websocket.onopen = async () => { + logger.info('WebSocket: Reconnection successful.'); + this.isReconnecting = false; + this.reconnectAttempts = 0; + await this._restoreSubscriptions(); + this._processRequestQueue(); + // Manually trigger the onOpen listeners as the original 'open' event was consumed. + this.events.emit('open', new Event('open')); + }; + + this.websocket.onerror = () => { + const delay = this.reconnectOptions.delay * 2 ** (this.reconnectAttempts - 1); + logger.info(`WebSocket: Reconnect attempt failed. Retrying in ${delay}ms.`); + this.reconnectTimeoutId = setTimeout(tryReconnect, delay); + }; + }; + + tryReconnect(); + } + + private onCloseProxy(ev: CloseEvent) { + this.websocket.removeEventListener('open', this.openListener); + this.websocket.removeEventListener('close', this.closeListener); + this.websocket.removeEventListener('message', this.messageListener); + this.websocket.removeEventListener('error', this.errorListener); + this.events.emit('close', ev); + + if (!this.userInitiatedClose) { + this._startReconnect(); + } + } + + private onMessageProxy(event: MessageEvent) { + let message: WebSocketEvent; + try { + message = JSON.parse(event.data); + } catch (error) { + logger.error( + `WebSocketChannel: Error parsing incoming message: ${event.data}, Error: ${error}` + ); + return; // Stop processing this malformed message. + } + + // Check if it's a subscription event. + if (message.method && isObject(message.params) && 'subscription_id' in message.params) { + const { result, subscription_id } = message.params as { + result: any; + subscription_id: SUBSCRIPTION_ID; + }; + const subscription = this.activeSubscriptions.get(subscription_id); + + if (subscription) { + subscription._handleEvent(result); + } else { + logger.warn( + `WebSocketChannel: Received event for untracked subscription ID: ${subscription_id}.` + ); + } + } + + logger.debug('onMessageProxy:', event.data); + + // Call the general onMessage handler if provided by the user for all messages. + this.events.emit('message', event); + } + + /** + * Subscribes to new block headers. + * @param {SubscriptionBlockIdentifier} [blockIdentifier] - The block to start receiving notifications from. Defaults to 'latest'. + * @returns {Promise>} A Promise that resolves with a `Subscription` object for new block headers. + */ + public async subscribeNewHeads( + blockIdentifier?: SubscriptionBlockIdentifier + ): Promise> { + const method = 'starknet_subscribeNewHeads'; + const params = { + block_id: blockIdentifier ? new Block(blockIdentifier).identifier : undefined, + }; + const subId = await this.sendReceive(method, params); + const subscription = new Subscription(this, method, params, subId, this.maxBufferSize); + this.activeSubscriptions.set(subId, subscription); + return subscription; + } + + /** + * Subscribes to events matching a given filter. + * @param {BigNumberish} [fromAddress] - The contract address to filter by. + * @param {string[][]} [keys] - The event keys to filter by. + * @param {SubscriptionBlockIdentifier} [blockIdentifier] - The block to start receiving notifications from. Defaults to 'latest'. + * @returns {Promise>} A Promise that resolves with a `Subscription` object for the specified events. + */ + public async subscribeEvents( + fromAddress?: BigNumberish, + keys?: string[][], + blockIdentifier?: SubscriptionBlockIdentifier + ): Promise> { + const method = 'starknet_subscribeEvents'; + const params = { + from_address: fromAddress !== undefined ? toHex(fromAddress) : undefined, + keys, + block_id: blockIdentifier ? new Block(blockIdentifier).identifier : undefined, + }; + const subId = await this.sendReceive(method, params); + const subscription = new Subscription(this, method, params, subId, this.maxBufferSize); + this.activeSubscriptions.set(subId, subscription); + return subscription; + } + + /** + * Subscribes to status updates for a specific transaction. + * @param {BigNumberish} transactionHash - The hash of the transaction to monitor. + * @param {SubscriptionBlockIdentifier} [blockIdentifier] - The block context. Not typically required. + * @returns {Promise>} A Promise that resolves with a `Subscription` object for the transaction's status. + */ + public async subscribeTransactionStatus( + transactionHash: BigNumberish, + blockIdentifier?: SubscriptionBlockIdentifier + ): Promise> { + const method = 'starknet_subscribeTransactionStatus'; + const params = { + transaction_hash: toHex(transactionHash), + block_id: blockIdentifier ? new Block(blockIdentifier).identifier : undefined, + }; + const subId = await this.sendReceive(method, params); + const subscription = new Subscription(this, method, params, subId, this.maxBufferSize); + this.activeSubscriptions.set(subId, subscription); + return subscription; + } + + /** + * Subscribes to pending transactions. + * @param {boolean} [transactionDetails] - If `true`, the full transaction details are included. Defaults to `false` (hash only). + * @param {BigNumberish[]} [senderAddress] - An array of sender addresses to filter by. + * @returns {Promise>} A Promise that resolves with a `Subscription` object for pending transactions. + */ + public async subscribePendingTransaction( + transactionDetails?: boolean, + senderAddress?: BigNumberish[] + ): Promise> { + const method = 'starknet_subscribePendingTransactions'; + const params = { + transaction_details: transactionDetails, + sender_address: senderAddress && bigNumberishArrayToHexadecimalStringArray(senderAddress), + }; + const subId = await this.sendReceive(method, params); + const subscription = new Subscription(this, method, params, subId, this.maxBufferSize); + this.activeSubscriptions.set(subId, subscription); + return subscription; + } + + /** + * Internal method to remove subscription from active map. + * @internal + */ + public removeSubscription(id: SUBSCRIPTION_ID) { + this.activeSubscriptions.delete(id); + } + + /** + * Adds a listener for a given event. + * @param event The event name. + * @param listener The listener function to add. + */ + public on( + event: K, + listener: (data: WebSocketChannelEvents[K]) => void + ): void { + this.events.on(event, listener); + } + + /** + * Removes a listener for a given event. + * @param event The event name. + * @param listener The listener function to remove. + */ + public off( + event: K, + listener: (data: WebSocketChannelEvents[K]) => void + ): void { + this.events.off(event, listener); + } +} diff --git a/src/channel/ws_0_8.ts b/src/channel/ws_0_8.ts deleted file mode 100644 index e0483063f..000000000 --- a/src/channel/ws_0_8.ts +++ /dev/null @@ -1,569 +0,0 @@ -import type { - SUBSCRIPTION_ID, - SubscriptionEventsResponse, - SubscriptionNewHeadsResponse, - SubscriptionPendingTransactionsResponse, - SubscriptionReorgResponse, - SubscriptionTransactionsStatusResponse, - WebSocketEvents, - WebSocketMethods, -} from '@starknet-io/starknet-types-08'; - -import { BigNumberish, SubscriptionBlockIdentifier } from '../types'; -import { JRPC } from '../types/api'; -import { WebSocketEvent } from '../types/api/jsonrpc'; -import WebSocket from '../utils/connect/ws'; -import { stringify } from '../utils/json'; -import { bigNumberishArrayToHexadecimalStringArray, toHex } from '../utils/num'; -import { Block } from '../utils/provider'; -import { config } from '../global/config'; - -export const WSSubscriptions = { - NEW_HEADS: 'newHeads', - EVENTS: 'events', - TRANSACTION_STATUS: 'transactionStatus', - PENDING_TRANSACTION: 'pendingTransactions', -} as const; - -export type WebSocketOptions = { - /** - * websocket node url address - * @example 'ws://www.host.com/path' - * @default public websocket enabled starknet node - */ - nodeUrl?: string; - /** - * This parameter should be used when working in an environment without native WebSocket support by providing - * an equivalent WebSocket object that conforms to the protocol, e.g. from the 'isows' and/or 'ws' modules - * * https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols . - * * https://www.rfc-editor.org/rfc/rfc6455.html#section-1 . - * @default WebSocket - */ - websocket?: WebSocket; -}; - -/** - * WebSocket channel provides communication with Starknet node over long-lived socket connection - */ -export class WebSocketChannel { - /** - * WebSocket RPC Node URL - * @example 'wss://starknet-node.io/rpc/v0_8' - */ - public nodeUrl: string; - - // public headers: object; - - // readonly retries: number; - - // public requestId: number; - - // readonly blockIdentifier: BlockIdentifier; - - // private chainId?: StarknetChainId; - - // private specVersion?: string; - - // private transactionRetryIntervalFallback?: number; - - // readonly waitMode: Boolean; // behave like web2 rpc and return when tx is processed - - // private batchClient?: BatchClient; - - /** - * ws library object - */ - public websocket: WebSocket; - - /** - * Assign implementation method to get 'on reorg event data' - * @example - * ```typescript - * webSocketChannel.onReorg = async function (data) { - * // ... do something when reorg happens - * } - * ``` - */ - public onReorg: (this: WebSocketChannel, data: SubscriptionReorgResponse) => any = () => {}; - - /** - * Assign implementation method to get 'starknet block heads' - * @example - * ```typescript - * webSocketChannel.onNewHeads = async function (data) { - * // ... do something with head data - * } - * ``` - */ - public onNewHeads: (this: WebSocketChannel, data: SubscriptionNewHeadsResponse) => any = () => {}; - - /** - * Assign implementation method to get 'starknet events' - * @example - * ```typescript - * webSocketChannel.onEvents = async function (data) { - * // ... do something with event data - * } - * ``` - */ - public onEvents: (this: WebSocketChannel, data: SubscriptionEventsResponse) => any = () => {}; - - /** - * Assign method to get 'starknet transactions status' - * @example - * ```typescript - * webSocketChannel.onTransactionStatus = async function (data) { - * // ... do something with tx status data - * } - * ``` - */ - public onTransactionStatus: ( - this: WebSocketChannel, - data: SubscriptionTransactionsStatusResponse - ) => any = () => {}; - - /** - * Assign implementation method to get 'starknet pending transactions (mempool)' - * @example - * ```typescript - * webSocketChannel.onPendingTransaction = async function (data) { - * // ... do something with pending tx data - * } - * ``` - */ - public onPendingTransaction: ( - this: WebSocketChannel, - data: SubscriptionPendingTransactionsResponse - ) => any = () => {}; - - /** - * Assign implementation to this method to listen open Event - */ - public onOpen: (this: WebSocketChannel, ev: Event) => any = () => {}; - - /** - * Assign implementation to this method to listen close CloseEvent - */ - public onClose: (this: WebSocketChannel, ev: CloseEvent) => any = () => {}; - - /** - * Assign implementation to this method to listen message MessageEvent - */ - public onMessage: (this: WebSocketChannel, ev: MessageEvent) => any = () => {}; - - /** - * Assign implementation to this method to listen error Event - */ - public onError: (this: WebSocketChannel, ev: Event) => any = () => {}; - - /** - * Assign implementation to this method to listen unsubscription - */ - public onUnsubscribe: (this: WebSocketChannel, _subscriptionId: SUBSCRIPTION_ID) => any = - () => {}; - - private onUnsubscribeLocal: (this: WebSocketChannel, _subscriptionId: SUBSCRIPTION_ID) => any = - () => {}; - - /** - * JSON RPC latest sent message id - * expecting receiving message to contain same id - */ - private sendId: number = 0; - - /** - * subscriptions ids - * mapped by keys WSSubscriptions - */ - readonly subscriptions: Map = new Map(); - - /** - * Construct class and event listeners - * @param options WebSocketOptions - */ - constructor(options: WebSocketOptions = {}) { - // provided existing websocket - const nodeUrl = options.nodeUrl || 'http://localhost:3000 '; // TODO: implement getDefaultNodeUrl default node when defined by providers? - this.nodeUrl = options.websocket ? options.websocket.url : nodeUrl; - this.websocket = options.websocket || config.get('websocket') || new WebSocket(nodeUrl); - - this.websocket.addEventListener('open', this.onOpen.bind(this)); - this.websocket.addEventListener('close', this.onCloseProxy.bind(this)); - this.websocket.addEventListener('message', this.onMessageProxy.bind(this)); - this.websocket.addEventListener('error', this.onError.bind(this)); - } - - private idResolver(id?: number) { - // unmanaged user set id - if (id) return id; - // managed id, intentional return old and than increment - // eslint-disable-next-line no-plusplus - return this.sendId++; - } - - /** - * Send data over open ws connection - * * this would only send data on the line without awaiting 'response message' - * @example - * ```typescript - * const sentId = await this.send('starknet_method', params); - * ``` - */ - public send(method: string, params?: object, id?: number) { - if (!this.isConnected()) { - throw Error('WebSocketChannel.send() fail due to socket disconnected'); - } - const usedId = this.idResolver(id); - const rpcRequestBody: JRPC.RequestBody = { - id: usedId, - jsonrpc: '2.0', - method, - ...(params && { params }), - }; - // Stringify should remove undefined params - this.websocket.send(stringify(rpcRequestBody)); - return usedId; - } - - /** - * Any Starknet method not just websocket override - */ - public sendReceiveAny(method: any, params?: any) { - return this.sendReceive(method, params); - } - - /** - * Send request and receive response over ws line - * This method abstract ws messages into request/response model - * @param method rpc method name - * @param params rpc method parameters - * @example - * ```typescript - * const response = await this.sendReceive('starknet_method', params); - * ``` - */ - public sendReceive( - method: T, - params?: WebSocketMethods[T]['params'] - ): Promise { - const sendId = this.send(method, params); - - return new Promise((resolve, reject) => { - if (!this.websocket) return; - this.websocket.onmessage = ({ data }) => { - const message: JRPC.ResponseBody = JSON.parse(data); - if (message.id === sendId) { - if ('result' in message) { - resolve(message.result); - } else { - reject(Error(`error on ${method}, ${message.error}`)); - } - } - // console.log(`data from ${method} response`, data); - }; - this.websocket.onerror = reject; - }); - } - - /** - * Helper to check connection is open - */ - public isConnected() { - return this.websocket.readyState === WebSocket.OPEN; - } - - /** - * await while websocket is connected - * * could be used to block the flow until websocket is open - * @example - * ```typescript - * const readyState = await webSocketChannel.waitForConnection(); - * ``` - */ - public async waitForConnection(): Promise { - // Wait websocket to connect - if (this.websocket.readyState !== WebSocket.OPEN) { - return new Promise((resolve, reject) => { - if (!this.websocket) return; - this.websocket.onopen = () => resolve(this.websocket.readyState); - this.websocket.onerror = (error) => { - return reject(error); - }; - }); - } - - return this.websocket.readyState; - } - - /** - * Disconnect the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. - */ - public disconnect(code?: number, reason?: string) { - this.websocket.close(code, reason); - } - - /** - * await while websocket is disconnected - * @example - * ```typescript - * const readyState = await webSocketChannel.waitForDisconnection(); - * ``` - */ - public async waitForDisconnection(): Promise { - // Wait websocket to disconnect - if (this.websocket.readyState !== WebSocket.CLOSED) { - return new Promise((resolve, reject) => { - if (!this.websocket) return; - this.websocket.onclose = () => resolve(this.websocket.readyState); - this.websocket.onerror = reject; - }); - } - - return this.websocket.readyState; - } - - /** - * Unsubscribe from starknet subscription - * @param subscriptionId - * @param ref internal usage, only for managed subscriptions - */ - public async unsubscribe(subscriptionId: SUBSCRIPTION_ID, ref?: string) { - const status = (await this.sendReceive('starknet_unsubscribe', { - subscription_id: subscriptionId, - })) as boolean; - if (status) { - if (ref) { - this.subscriptions.delete(ref); - } - this.onUnsubscribeLocal(subscriptionId); - this.onUnsubscribe(subscriptionId); - } - return status; - } - - /** - * await while subscription is unsubscribed - * @param forSubscriptionId if defined trigger on subscriptionId else trigger on any - * @returns subscriptionId | onerror(Event) - * @example - * ```typescript - * const subscriptionId = await webSocketChannel.waitForUnsubscription(); - * ``` - */ - public async waitForUnsubscription(forSubscriptionId?: SUBSCRIPTION_ID) { - // unsubscribe - return new Promise((resolve, reject) => { - if (!this.websocket) return; - this.onUnsubscribeLocal = (subscriptionId) => { - if (forSubscriptionId === undefined) { - resolve(subscriptionId); - } else if (subscriptionId === forSubscriptionId) { - resolve(subscriptionId); - } - }; - this.websocket.onerror = reject; - }); - } - - /** - * Reconnect re-create this.websocket instance - */ - public reconnect() { - this.websocket = new WebSocket(this.nodeUrl); - - this.websocket.addEventListener('open', this.onOpen.bind(this)); - this.websocket.addEventListener('close', this.onCloseProxy.bind(this)); - this.websocket.addEventListener('message', this.onMessageProxy.bind(this)); - this.websocket.addEventListener('error', this.onError.bind(this)); - } - - // TODO: Add/Test ping service. It seems this work out of the box from pathfinder. If net disc. it will auto replay. - private reconnectAndUpdate() { - this.reconnect(); - // TODO: attempt n reconnection times - // TODO: replay data from last block received (including it) up to latest - } - - private onCloseProxy(ev: CloseEvent) { - this.websocket.removeEventListener('open', this.onOpen); - this.websocket.removeEventListener('close', this.onCloseProxy); - this.websocket.removeEventListener('message', this.onMessageProxy); - this.websocket.removeEventListener('error', this.onError); - this.onClose(ev); - } - - private onMessageProxy(event: MessageEvent) { - const message: WebSocketEvent = JSON.parse(event.data); - const eventName = message.method as keyof WebSocketEvents; - - switch (eventName) { - case 'starknet_subscriptionReorg': - this.onReorg(message.params as SubscriptionReorgResponse); - break; - case 'starknet_subscriptionNewHeads': - this.onNewHeads(message.params as SubscriptionNewHeadsResponse); - break; - case 'starknet_subscriptionEvents': - this.onEvents(message.params as SubscriptionEventsResponse); - break; - case 'starknet_subscriptionTransactionStatus': - this.onTransactionStatus(message.params as SubscriptionTransactionsStatusResponse); - break; - case 'starknet_subscriptionPendingTransactions': - this.onPendingTransaction(message.params as SubscriptionPendingTransactionsResponse); - break; - default: - break; - } - this.onMessage(event); - } - - /** - * subscribe to new block heads - * * you can subscribe to this event multiple times and you need to manage subscriptions manually - */ - public subscribeNewHeadsUnmanaged(blockIdentifier?: SubscriptionBlockIdentifier) { - const block_id = blockIdentifier ? new Block(blockIdentifier).identifier : undefined; - - return this.sendReceive('starknet_subscribeNewHeads', { - ...{ block_id }, - }) as Promise; - } - - /** - * subscribe to new block heads - */ - public async subscribeNewHeads(blockIdentifier?: SubscriptionBlockIdentifier) { - if (this.subscriptions.get(WSSubscriptions.NEW_HEADS)) return false; - const subId = await this.subscribeNewHeadsUnmanaged(blockIdentifier); - this.subscriptions.set(WSSubscriptions.NEW_HEADS, subId); - return subId; - } - - /** - * Unsubscribe newHeads subscription - */ - public async unsubscribeNewHeads() { - const subId = this.subscriptions.get(WSSubscriptions.NEW_HEADS); - if (!subId) throw Error('There is no subscription on this event'); - return this.unsubscribe(subId, WSSubscriptions.NEW_HEADS); - } - - /** - * subscribe to 'starknet events' - * * you can subscribe to this event multiple times and you need to manage subscriptions manually - */ - public subscribeEventsUnmanaged( - fromAddress?: BigNumberish, - keys?: string[][], - blockIdentifier?: SubscriptionBlockIdentifier - ) { - const block_id = blockIdentifier ? new Block(blockIdentifier).identifier : undefined; - return this.sendReceive('starknet_subscribeEvents', { - ...{ from_address: fromAddress !== undefined ? toHex(fromAddress) : undefined }, - ...{ keys }, - ...{ block_id }, - }) as Promise; - } - - /** - * subscribe to 'starknet events' - */ - public async subscribeEvents( - fromAddress?: BigNumberish, - keys?: string[][], - blockIdentifier?: SubscriptionBlockIdentifier - ) { - if (this.subscriptions.get(WSSubscriptions.EVENTS)) return false; - // eslint-disable-next-line prefer-rest-params - const subId = await this.subscribeEventsUnmanaged(fromAddress, keys, blockIdentifier); - this.subscriptions.set(WSSubscriptions.EVENTS, subId); - return subId; - } - - /** - * Unsubscribe 'starknet events' subscription - */ - public unsubscribeEvents() { - const subId = this.subscriptions.get(WSSubscriptions.EVENTS); - if (!subId) throw Error('There is no subscription ID for this event'); - return this.unsubscribe(subId, WSSubscriptions.EVENTS); - } - - /** - * subscribe to transaction status - * * you can subscribe to this event multiple times and you need to manage subscriptions manually - */ - public subscribeTransactionStatusUnmanaged( - transactionHash: BigNumberish, - blockIdentifier?: SubscriptionBlockIdentifier - ) { - const transaction_hash = toHex(transactionHash); - const block_id = blockIdentifier ? new Block(blockIdentifier).identifier : undefined; - return this.sendReceive('starknet_subscribeTransactionStatus', { - transaction_hash, - ...{ block_id }, - }) as Promise; - } - - /** - * subscribe to transaction status - */ - public async subscribeTransactionStatus(transactionHash: BigNumberish) { - if (this.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS)) return false; - const subId = await this.subscribeTransactionStatusUnmanaged(transactionHash); - this.subscriptions.set(WSSubscriptions.TRANSACTION_STATUS, subId); - return subId; - } - - /** - * unsubscribe 'transaction status' subscription - */ - public async unsubscribeTransactionStatus() { - const subId = this.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS); - if (!subId) throw Error('There is no subscription ID for this event'); - return this.unsubscribe(subId, WSSubscriptions.TRANSACTION_STATUS); - } - - /** - * subscribe to pending transactions (mempool) - * * you can subscribe to this event multiple times and you need to manage subscriptions manually - */ - public subscribePendingTransactionUnmanaged( - transactionDetails?: boolean, - senderAddress?: BigNumberish[] - ) { - return this.sendReceive('starknet_subscribePendingTransactions', { - ...{ transaction_details: transactionDetails }, - ...{ - sender_address: senderAddress && bigNumberishArrayToHexadecimalStringArray(senderAddress), - }, - }) as Promise; - } - - /** - * subscribe to pending transactions (mempool) - */ - public async subscribePendingTransaction( - transactionDetails?: boolean, - senderAddress?: BigNumberish[] - ) { - if (this.subscriptions.get(WSSubscriptions.TRANSACTION_STATUS)) return false; - // eslint-disable-next-line no-param-reassign - const subId = await this.subscribePendingTransactionUnmanaged( - transactionDetails, - senderAddress - ); - this.subscriptions.set(WSSubscriptions.PENDING_TRANSACTION, subId); - return subId; - } - - /** - * unsubscribe 'pending transaction' subscription - */ - public async unsubscribePendingTransaction() { - const subId = this.subscriptions.get(WSSubscriptions.PENDING_TRANSACTION); - if (!subId) throw Error('There is no subscription ID for this event'); - return this.unsubscribe(subId, WSSubscriptions.PENDING_TRANSACTION); - } -} diff --git a/src/provider/index.ts b/src/provider/index.ts index 027a54085..c43c2b2ed 100644 --- a/src/provider/index.ts +++ b/src/provider/index.ts @@ -1,7 +1,7 @@ import { RpcProvider } from './rpc'; export { RpcProvider as Provider } from './extensions/default'; // backward-compatibility -export * from '../utils/errors'; +export { LibraryError, RpcError } from '../utils/errors'; export * from './interface'; export * from './extensions/default'; diff --git a/src/utils/errors/index.ts b/src/utils/errors/index.ts index 2207a6049..f8ee068db 100644 --- a/src/utils/errors/index.ts +++ b/src/utils/errors/index.ts @@ -76,3 +76,25 @@ export class RpcError extends LibraryE return rpcErrors[typeName] === this.code; } } + +/** + * Thrown when a WebSocket request does not receive a response within the configured timeout period. + * @property {string} name - The name of the error, always 'TimeoutError'. + */ +export class TimeoutError extends LibraryError { + constructor(message: string) { + super(message); + this.name = 'TimeoutError'; + } +} + +/** + * Thrown when an operation is attempted on a WebSocket that is not connected. + * @property {string} name - The name of the error, always 'WebSocketNotConnectedError'. + */ +export class WebSocketNotConnectedError extends LibraryError { + constructor(message: string) { + super(message); + this.name = 'WebSocketNotConnectedError'; + } +} diff --git a/src/utils/eventEmitter.ts b/src/utils/eventEmitter.ts new file mode 100644 index 000000000..8823fc073 --- /dev/null +++ b/src/utils/eventEmitter.ts @@ -0,0 +1,30 @@ +/* eslint-disable max-classes-per-file */ +type Listener = (data: T) => void; + +export class EventEmitter> { + private listeners: { [K in keyof T]?: Listener[] } = {}; + + public on(event: K, listener: Listener): void { + if (!this.listeners[event]) { + this.listeners[event] = []; + } + this.listeners[event]!.push(listener); + } + + public off(event: K, listener: Listener): void { + if (!this.listeners[event]) { + return; + } + this.listeners[event] = this.listeners[event]!.filter((l) => l !== listener); + } + + public emit(event: K, data: T[K]): void { + if (this.listeners[event]) { + this.listeners[event]!.forEach((listener) => listener(data)); + } + } + + public clear(): void { + this.listeners = {}; + } +} diff --git a/www/docs/guides/websocket_channel.md b/www/docs/guides/websocket_channel.md index f448f5584..fd85b2abc 100644 --- a/www/docs/guides/websocket_channel.md +++ b/www/docs/guides/websocket_channel.md @@ -2,70 +2,139 @@ sidebar_position: 3 --- -# Channel (WebSocket, Rpc) +# WebSocket Channel -Channel is the lowest purest object you can use to interact with the network. -Channel represent implementation of the Starknet specification in it's strictly defined form. +The `WebSocketChannel` provides a robust, real-time connection to a Starknet RPC Node, enabling you to subscribe to events and receive updates as they happen. It's designed for production use with features like automatic reconnection, request queueing, and a modern subscription management API. -## WebSocket Channel +Ensure that you are using a node that supports the required RPC spec (e.g., v0.8.0). -WebSocket channel provide convenient way to establish websocket connection to the [Starknet RPC Node](https://www.starknet.io/fullnodes-rpc-services/). +## Key Features -Ensure that you are using node supporting the required RPC spec >= v0.8.0. Details regarding Starknet Nodes and supported RPC versions could be found on each node github/page. +- **Modern API**: Uses a `Subscription` object to manage event streams. +- **Automatic Reconnection**: Automatically detects connection drops and reconnects with an exponential backoff strategy. +- **Request Queueing**: Queues any requests made while the connection is down and executes them upon reconnection. +- **Event Buffering**: Buffers events for a subscription if no handler is attached, preventing event loss. +- **Custom Errors**: Throws specific, catchable errors like `TimeoutError` for more reliable error handling. -Websocket Channel implements specification methods defined by [@starknet-io/types-js](https://github.com/starknet-io/types-js/blob/b7d38ca30a1def28e89370068efff81b3a3062b7/src/api/methods.ts#L421) +## Importing -### Import +To get started, import the necessary classes and types from the `starknet` library. ```typescript -import { WebSocketChannel } from 'starknet'; +import { + WebSocketChannel, + WebSocketOptions, + Subscription, + TimeoutError, + WebSocketNotConnectedError, +} from 'starknet'; ``` -### Create instance +## Creating a WebSocket Channel + +Instantiate `WebSocketChannel` with your node's WebSocket URL. ```typescript -// create new ws channel -const webSocketChannel = new WebSocketChannel({ - nodeUrl: 'wss://sepolia-pathfinder-rpc.server.io/rpc/v0_8', +const channel = new WebSocketChannel({ + nodeUrl: 'wss://your-starknet-node/rpc/v0_8', }); -// ensure ws channel is open -await webSocketChannel.waitForConnection(); - -// ... use webSocketChannel +// It's good practice to wait for the initial connection. +await channel.waitForConnection(); ``` -If the environment doesn't have a detectable global `WebSocket`, an appropriate `WebSocket` implementation should be used and set with the `websocket` constructor parameter. +If you are in an environment without a native `WebSocket` object (like Node.js), you can provide a custom implementation (e.g., from the `ws` library). ```typescript -import { WebSocket } from 'ws'; +import WebSocket from 'ws'; -const webSocketChannel = new WebSocketChannel({ - websocket: new WebSocket('wss://sepolia-pathfinder-rpc.server.io/rpc/v0_8'), +const channel = new WebSocketChannel({ + nodeUrl: '...', + websocket: WebSocket, // Provide the implementation class }); + +await channel.waitForConnection(); ``` -### Usage +### Advanced Configuration -```typescript -// subscribe to event -await webSocketChannel.subscribeNewHeads(); +You can customize the channel's behavior with `WebSocketOptions`. -// define listener method -webSocketChannel.onNewHeads = async function (data) { - //... on event new head data +```typescript +const options: WebSocketOptions = { + nodeUrl: '...', + autoReconnect: true, // Default: true + reconnectOptions: { + retries: 5, // Default: 5 + delay: 2000, // Default: 2000ms + }, + requestTimeout: 60000, // Default: 60000ms + maxBufferSize: 1000, // Default: 1000 events per subscription }; + +const channel = new WebSocketChannel(options); ``` -Available subscriptions are: +## Subscribing to Events + +When you call a subscription method (e.g., `subscribeNewHeads`), it returns a `Promise` that resolves with a `Subscription` object. This object is your handle to that specific event stream. + +You attach a listener with `.on()` and stop listening with `.unsubscribe()`. + +```typescript +// 1. Subscribe to an event stream. +const sub: Subscription = await channel.subscribeNewHeads(); + +// 2. Attach a handler to process incoming data. +sub.on((data) => { + console.log('Received new block header:', data.block_number); +}); + +// 3. When you're done, unsubscribe. +// This is automatically handled if the channel disconnects and restores the subscription. +// You only need to call this when you explicitly want to stop listening. +await sub.unsubscribe(); +``` + +### Event Buffering + +If you `await` a subscription but don't immediately attach an `.on()` handler, the `Subscription` object will buffer incoming events. Once you attach a handler, all buffered events will be delivered in order before any new events are processed. This prevents event loss during asynchronous setup. + +The buffer size is limited by the `maxBufferSize` in the channel options. If the buffer is full, the oldest events are dropped. + +## Automatic Reconnection and Queueing + +The channel is designed to be resilient. If the connection drops, it will automatically try to reconnect. While reconnecting: + +- Any API calls (e.g., `sendReceive`, `subscribeNewHeads`) will be queued. +- Once the connection is restored, the queue will be processed automatically. +- All previously active subscriptions will be **automatically re-subscribed**. The original `Subscription` objects you hold will continue to work without any need for manual intervention. + +## Error Handling + +The channel throws specific errors, allowing for precise error handling. + +```typescript +try { + const result = await channel.sendReceive('starknet_chainId'); +} catch (e) { + if (e instanceof TimeoutError) { + console.error('The request timed out!'); + } else if (e instanceof WebSocketNotConnectedError) { + console.error('The WebSocket is not connected.'); + } else { + console.error('An unknown error occurred:', e); + } +} +``` -- subscribeNewHeads -- subscribeEvents -- subscribeTransactionStatus -- subscribePendingTransaction +## Available Subscription Methods -Complete API can be found on [websocket API section](/docs/next/API/classes/WebSocketChannel) +Each of these methods returns a `Promise`. -### Unmanaged subscriptions +- `subscribeNewHeads` +- `subscribeEvents` +- `subscribeTransactionStatus` +- `subscribePendingTransaction` -Websocket channel manage subscription id, but it is limited to one subscription per event type. If you need multiple subscriptions of the same type use \*Unmanaged methods and handle subscriptions manually. +For more details, see the complete [API documentation](/docs/next/API/classes/WebSocketChannel).