From c256f16a68e7e8a040e18fd700a08587385342cb Mon Sep 17 00:00:00 2001 From: wphan Date: Tue, 29 Jul 2025 11:28:46 -0700 Subject: [PATCH] sdk: restore grpc disconnect default behavior --- .../baseSubscribers/grpcAccountSubscriber.ts | 35 ++++++++++++------ .../grpcProgramAccountSubscriber.ts | 37 +++++++++++++------ sdk/src/accounts/types.ts | 5 +++ 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/sdk/src/accounts/baseSubscribers/grpcAccountSubscriber.ts b/sdk/src/accounts/baseSubscribers/grpcAccountSubscriber.ts index 53d99602d7..1cf04d938c 100644 --- a/sdk/src/accounts/baseSubscribers/grpcAccountSubscriber.ts +++ b/sdk/src/accounts/baseSubscribers/grpcAccountSubscriber.ts @@ -18,6 +18,7 @@ export class grpcAccountSubscriber extends WebSocketAccountSubscriber { private stream: ClientDuplexStream; private commitmentLevel: CommitmentLevel; public listenerId?: number; + private enableReconnect: boolean; private constructor( client: Client, @@ -26,11 +27,13 @@ export class grpcAccountSubscriber extends WebSocketAccountSubscriber { program: Program, accountPublicKey: PublicKey, decodeBuffer?: (buffer: Buffer) => T, - resubOpts?: ResubOpts + resubOpts?: ResubOpts, + enableReconnect = false ) { super(accountName, program, accountPublicKey, decodeBuffer, resubOpts); this.client = client; this.commitmentLevel = commitmentLevel; + this.enableReconnect = enableReconnect; } public static async create( @@ -57,7 +60,8 @@ export class grpcAccountSubscriber extends WebSocketAccountSubscriber { program, accountPublicKey, decodeBuffer, - resubOpts + resubOpts, + grpcConfigs.enableReconnect ); } @@ -92,15 +96,24 @@ export class grpcAccountSubscriber extends WebSocketAccountSubscriber { transactionsStatus: {}, }; - this.stream.on('error', (error) => { - // @ts-ignore - if (error.code === 1) { - // expected: 1 CANCELLED: Cancelled on client - return; - } else { - console.error('GRPC unexpected error caught:', error); - } - }); + if (this.enableReconnect) { + this.stream.on('error', (error) => { + // @ts-ignore + if (error.code === 1) { + // expected: 1 CANCELLED: Cancelled on client + console.error( + 'GRPC (grpcAccountSubscriber) Cancelled on client caught:', + error + ); + return; + } else { + console.error( + 'GRPC (grpcAccountSubscriber) unexpected error caught:', + error + ); + } + }); + } this.stream.on('data', (chunk: SubscribeUpdate) => { if (!chunk.account) { diff --git a/sdk/src/accounts/programAccount/grpcProgramAccountSubscriber.ts b/sdk/src/accounts/programAccount/grpcProgramAccountSubscriber.ts index 8cc26fdfcd..323ab76848 100644 --- a/sdk/src/accounts/programAccount/grpcProgramAccountSubscriber.ts +++ b/sdk/src/accounts/programAccount/grpcProgramAccountSubscriber.ts @@ -20,6 +20,7 @@ export class grpcProgramAccountSubscriber< private stream: ClientDuplexStream; private commitmentLevel: CommitmentLevel; public listenerId?: number; + private enableReconnect: boolean; private constructor( client: Client, @@ -31,7 +32,8 @@ export class grpcProgramAccountSubscriber< options: { filters: MemcmpFilter[] } = { filters: [], }, - resubOpts?: ResubOpts + resubOpts?: ResubOpts, + enableReconnect = false ) { super( subscriptionName, @@ -43,6 +45,7 @@ export class grpcProgramAccountSubscriber< ); this.client = client; this.commitmentLevel = commitmentLevel; + this.enableReconnect = enableReconnect; } public static async create( @@ -73,7 +76,8 @@ export class grpcProgramAccountSubscriber< program, decodeBufferFn, options, - resubOpts + resubOpts, + grpcConfigs.enableReconnect ); } @@ -119,15 +123,26 @@ export class grpcProgramAccountSubscriber< entry: {}, transactionsStatus: {}, }; - this.stream.on('error', (error) => { - // @ts-ignore - if (error.code === 1) { - // expected: 1 CANCELLED: Cancelled on client - return; - } else { - console.error('GRPC unexpected error caught:', error); - } - }); + + if (this.enableReconnect) { + this.stream.on('error', (error) => { + // @ts-ignore + if (error.code === 1) { + // expected: 1 CANCELLED: Cancelled on client + console.error( + 'GRPC (grpcProgramAccountSubscriber) Cancelled on client caught:', + error + ); + return; + } else { + console.error( + 'GRPC (grpcProgramAccountSubscriber) unexpected error caught:', + error + ); + } + }); + } + this.stream.on('data', (chunk: SubscribeUpdate) => { if (!chunk.account) { return; diff --git a/sdk/src/accounts/types.ts b/sdk/src/accounts/types.ts index 5891da768f..4845bd1099 100644 --- a/sdk/src/accounts/types.ts +++ b/sdk/src/accounts/types.ts @@ -219,6 +219,11 @@ export type GrpcConfigs = { token: string; commitmentLevel?: CommitmentLevel; channelOptions?: ChannelOptions; + /** + * Whether to enable automatic reconnection on connection loss . + * Defaults to false, will throw on connection loss. + */ + enableReconnect?: boolean; }; export interface HighLeverageModeConfigAccountSubscriber {