1- import pg , { PoolConfig } from 'pg'
2- import { DatabaseError } from 'pg-protocol'
1+ import pg from 'pg'
32import { parse as parseArray } from 'postgres-array'
4- import { PostgresMetaResult } from './types.js'
3+ import { PostgresMetaResult , PoolConfig } from './types.js'
54
65pg . types . setTypeParser ( pg . types . builtins . INT8 , ( x ) => {
76 const asNumber = Number ( x )
@@ -21,6 +20,38 @@ pg.types.setTypeParser(1185, parseArray) // _timestamptz
2120pg . types . setTypeParser ( 600 , ( x ) => x ) // point
2221pg . types . setTypeParser ( 1017 , ( x ) => x ) // _point
2322
23+ // Ensure any query will have an appropriate error handler on the pool to prevent connections errors
24+ // to bubble up all the stack eventually killing the server
25+ const poolerQueryHandleError = ( pool : pg . Pool , sql : string ) : Promise < pg . QueryResult < any > > => {
26+ return new Promise ( ( resolve , reject ) => {
27+ let rejected = false
28+ const connectionErrorHandler = ( err : any ) => {
29+ // If the error hasn't already be propagated to the catch
30+ if ( ! rejected ) {
31+ rejected = true
32+ reject ( err )
33+ }
34+ }
35+ // This listened avoid getting uncaught exceptions for errors happening at connection level within the stream
36+ // such as parse or RESULT_SIZE_EXCEEDED errors instead, handle the error gracefully by bubbling in up to the caller
37+ pool . once ( 'error' , connectionErrorHandler )
38+ pool
39+ . query ( sql )
40+ . then ( ( results : pg . QueryResult < any > ) => {
41+ if ( ! rejected ) {
42+ return resolve ( results )
43+ }
44+ } )
45+ . catch ( ( err : any ) => {
46+ // If the error hasn't already be handled within the error listener
47+ if ( ! rejected ) {
48+ rejected = true
49+ return reject ( err )
50+ }
51+ } )
52+ } )
53+ }
54+
2455export const init : ( config : PoolConfig ) => {
2556 query : ( sql : string ) => Promise < PostgresMetaResult < any > >
2657 end : ( ) => Promise < void >
@@ -60,26 +91,27 @@ export const init: (config: PoolConfig) => {
6091 // compromise: if we run `query` after `pool.end()` is called (i.e. pool is
6192 // `null`), we temporarily create a pool and close it right after.
6293 let pool : pg . Pool | null = new pg . Pool ( config )
94+
6395 return {
6496 async query ( sql ) {
6597 try {
6698 if ( ! pool ) {
6799 const pool = new pg . Pool ( config )
68- let res = await pool . query ( sql )
100+ let res = await poolerQueryHandleError ( pool , sql )
69101 if ( Array . isArray ( res ) ) {
70102 res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
71103 }
72104 await pool . end ( )
73105 return { data : res . rows , error : null }
74106 }
75107
76- let res = await pool . query ( sql )
108+ let res = await poolerQueryHandleError ( pool , sql )
77109 if ( Array . isArray ( res ) ) {
78110 res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
79111 }
80112 return { data : res . rows , error : null }
81113 } catch ( error : any ) {
82- if ( error instanceof DatabaseError ) {
114+ if ( error . constructor . name === ' DatabaseError' ) {
83115 // Roughly based on:
84116 // - https://github.com/postgres/postgres/blob/fc4089f3c65a5f1b413a3299ba02b66a8e5e37d0/src/interfaces/libpq/fe-protocol3.c#L1018
85117 // - https://github.com/brianc/node-postgres/blob/b1a8947738ce0af004cb926f79829bb2abc64aa6/packages/pg/lib/native/query.js#L33
@@ -147,7 +179,60 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
147179 }
148180 }
149181
150- return { data : null , error : { message : error . message } }
182+ // Handle stream errors and result size exceeded errors
183+ if ( error . code === 'RESULT_SIZE_EXCEEDED' ) {
184+ // Force kill the connection without waiting for graceful shutdown
185+ const _pool = pool
186+ pool = null
187+ try {
188+ if ( _pool ) {
189+ // Force kill the connection by destroying the socket
190+ const client = ( _pool as any ) . _clients ?. [ 0 ]
191+ if ( client ?. connection ?. stream ) {
192+ client . connection . stream . destroy ( )
193+ }
194+ }
195+ } catch ( endError ) {
196+ // Ignore any errors during cleanup
197+ }
198+ return {
199+ data : null ,
200+ error : {
201+ message : `Query result size (${ error . resultSize } bytes) exceeded the configured limit (${ error . maxResultSize } bytes)` ,
202+ code : error . code ,
203+ resultSize : error . resultSize ,
204+ maxResultSize : error . maxResultSize ,
205+ } ,
206+ }
207+ }
208+
209+ // Handle other stream errors
210+ if ( error . code === 'STREAM_ERROR' ) {
211+ // Force kill the connection without waiting for graceful shutdown
212+ const _pool = pool
213+ pool = null
214+ try {
215+ if ( _pool ) {
216+ // Force kill the connection by destroying the socket
217+ const client = ( _pool as any ) . _clients ?. [ 0 ]
218+ if ( client ?. connection ?. stream ) {
219+ client . connection . stream . destroy ( )
220+ }
221+ }
222+ } catch ( endError ) {
223+ // Ignore any errors during cleanup
224+ }
225+ return {
226+ data : null ,
227+ error : {
228+ message : 'Stream error occurred while processing query' ,
229+ code : error . code ,
230+ details : error . message ,
231+ } ,
232+ }
233+ }
234+
235+ return { data : null , error : { ...error , message : error . message } }
151236 }
152237 } ,
153238
@@ -156,7 +241,11 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
156241 pool = null
157242 // Gracefully wait for active connections to be idle, then close all
158243 // connections in the pool.
159- if ( _pool ) await _pool . end ( )
244+ if ( _pool ) {
245+ // Remove all listeners before ending to prevent memory leaks
246+ _pool . removeAllListeners ( )
247+ await _pool . end ( )
248+ }
160249 } ,
161250 }
162251}
0 commit comments