From a55f001a8443f9399cd00c89eeb1281e1fd2b4ee Mon Sep 17 00:00:00 2001 From: Peter Lehnhardt Date: Fri, 21 Nov 2025 13:53:10 +0100 Subject: [PATCH] Fix unhandled errors on custom connection errors This change fixes crashes due to unhandled errors if more than one packet with unknown correlation ID are received in succession. The connection pool attaches an error handler via `connection.once('error', ...)` which detaches the error handler after being invoked once. Further emitted errors will be handled as uncaught exceptions and exit the program. Also, the socket is not closed when emitting this error and keeps listening for incoming messages. By destroying the socket and providing the appropriate error we prevent the socket from handling any more packets, thus preventing uncaught exceptions. Furthermore, by providing the error as parameter to the socket destruction we stay within the normal socket lifecycle and all clean up on socket closure is done, which was previously skipped entirely. on-behalf-of: @SAP ospo@sap.com --- src/network/connection.ts | 3 +-- test/network/connection.test.ts | 35 ++++++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/network/connection.ts b/src/network/connection.ts index 06e062f..20f97cb 100644 --- a/src/network/connection.ts +++ b/src/network/connection.ts @@ -604,8 +604,7 @@ export class Connection extends EventEmitter { const request = this.#inflightRequests.get(correlationId) if (!request) { - this.emit( - 'error', + this.#socket.destroy( new UnexpectedCorrelationIdError(`Received unexpected response with correlation_id=${correlationId}`, { raw: this.#responseReader.buffer.slice(0, this.#nextMessage + INT32_SIZE) }) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 139aa30..b654318 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -32,6 +32,18 @@ import { mockedErrorMessage, mockedOperationId } from '../helpers.ts' +import { scheduler } from 'node:timers/promises' + +process + .on('unhandledRejection', (reason, p) => { + console.error('Unhandled Rejection at:', p) + console.error(reason) + }) + .on('uncaughtException', err => { + console.error('Uncaught Exception thrown') + console.error(err) + process.exit(1) + }) // Create passwords as Confluent Kafka images don't support it via environment const saslBroker = parseBroker(kafkaSaslBootstrapServers[0]) @@ -569,7 +581,20 @@ test('Connection should handle unexpected correlation IDs', async t => { response.writeInt32BE(8, 0) // size (8 bytes payload) response.writeInt32BE(99999, 4) // Unexpected correlationId response.writeInt32BE(123, 8) // mock result - socket.end(response) + socket.write(response) + + // Put next write on event loop to ensure onData is first processing only the first response + setTimeout(() => { + if (socket.writable) { + // Send another response with another unexpected correlation ID + // If this is received and processed by the connection, it will lead to an unhandled exception + const response2 = Buffer.alloc(4 + 4 + 4) // size + correlationId + result + response2.writeInt32BE(8, 0) // size (8 bytes payload) + response2.writeInt32BE(100000, 4) // Unexpected correlationId + response2.writeInt32BE(124, 8) // mock result + socket.write(response2) + } + }, 0) }) }) @@ -604,8 +629,12 @@ test('Connection should handle unexpected correlation IDs', async t => { // Wait for error const error = await errorPromise - ok(error instanceof UnexpectedCorrelationIdError) - strictEqual(error.message, 'Received unexpected response with correlation_id=99999') + ok(error instanceof NetworkError) + ok(error.cause instanceof UnexpectedCorrelationIdError) + strictEqual(error.cause.message, 'Received unexpected response with correlation_id=99999') + + // Put test finish on event loop to allow server to send the second unexpected correlation ID + await scheduler.wait(0) }) test('Connection should handle socket errors', async t => {