@@ -7,7 +7,7 @@ import crypto from "node:crypto";
77import { Agent , RetryAgent } from "undici" ;
88
99import { log } from "./logging" ;
10- import { validateTableName , validateColumnName } from "./validation" ;
10+ import { validateColumnName , validateTableName } from "./validation" ;
1111import { SenderOptions , HTTP , HTTPS , TCP , TCPS } from "./options" ;
1212
1313const HTTP_NO_CONTENT = 204 ; // success
@@ -121,8 +121,6 @@ class Sender {
121121 /** @private */ bufferSize ;
122122 /** @private */ maxBufferSize ;
123123 /** @private */ buffer ;
124- /** @private */ toBuffer ;
125- /** @private */ doResolve ;
126124 /** @private */ position ;
127125 /** @private */ endOfLastRow ;
128126
@@ -145,7 +143,6 @@ class Sender {
145143 /** @private */ log ;
146144 /** @private */ agent ;
147145 /** @private */ jwk ;
148- /** @private */ flushPromiseChain : Promise < boolean > ;
149146
150147 /**
151148 * Creates an instance of Sender.
@@ -157,10 +154,8 @@ class Sender {
157154 if ( ! options || ! options . protocol ) {
158155 throw new Error ( "The 'protocol' option is mandatory" ) ;
159156 }
160- replaceDeprecatedOptions ( options ) ;
161-
162157 this . log = typeof options . log === "function" ? options . log : log ;
163- this . flushPromiseChain = Promise . resolve ( true as boolean ) ;
158+ replaceDeprecatedOptions ( options , this . log ) ;
164159
165160 switch ( options . protocol ) {
166161 case HTTP :
@@ -248,8 +243,6 @@ class Sender {
248243 ? options . retry_timeout
249244 : DEFAULT_RETRY_TIMEOUT ;
250245
251- const noCopy = isBoolean ( options . copy_buffer ) && ! options . copy_buffer ;
252- this . toBuffer = noCopy ? this . toBufferView : this . toBufferNew ;
253246 this . maxBufferSize = isInteger ( options . max_buf_size , 1 )
254247 ? options . max_buf_size
255248 : DEFAULT_MAX_BUFFER_SIZE ;
@@ -440,64 +433,17 @@ class Sender {
440433 }
441434
442435 /**
443- * @ignore
444- * Compacts the buffer after data has been sent and resets pending row count.
445- * This method should only be called after a flush operation has successfully sent data.
446- * @param {number } bytesSent The number of bytes that were successfully sent and should be compacted.
447- */
448- private _compactBufferAndResetState ( bytesSent : number ) {
449- if ( bytesSent > 0 && bytesSent <= this . position ) {
450- this . buffer . copy ( this . buffer , 0 , bytesSent , this . position ) ;
451- this . position = this . position - bytesSent ;
452- } else if ( bytesSent > this . position ) {
453- // This case should ideally not happen if logic is correct, means we tried to compact more than available
454- this . position = 0 ;
455- }
456- // If bytesSent is 0 or negative, or if no actual data was at the start of the buffer to be shifted,
457- // this.position effectively remains the same relative to the start of new data.
458-
459- this . endOfLastRow = Math . max ( 0 , this . endOfLastRow - bytesSent ) ;
460- // Ensure endOfLastRow is also shifted if it was within the compacted area,
461- // or reset if it pointed to data that's now gone.
462- // If new rows were added while flushing, endOfLastRow would be > position post-compaction of old data.
463- // This needs careful handling if new data is added *during* an async flush.
464- // For now, we assume endOfLastRow is relative to the data just flushed.
465- // A simpler approach might be to always set this.endOfLastRow = 0 after a successful flush,
466- // as startNewRow() will set it correctly for the *next* new row.
467- // However, if a flush doesn't clear all pending complete rows, this needs to be accurate.
468- // The current `flush` logic sends up to `this.endOfLastRow`, so after sending `dataAmountToSend`
469- // (which was `this.endOfLastRow` at the time of prepping the flush), the new `this.endOfLastRow`
470- // should effectively be 0 relative to the start of the compacted buffer, until a new row is started.
471-
472- this . lastFlushTime = Date . now ( ) ;
473- this . pendingRowCount = 0 ; // Reset after successful flush
474- // If autoFlush was triggered by row count, this reset is crucial.
475- // If triggered by interval, this is also fine.
476- }
477-
478- /**
479- * @ignore
480- * Executes the actual data sending logic (HTTP or TCP).
481- * This is called by the `flush` method, wrapped in the promise chain.
482- * @return {Promise<boolean> } Resolves to true if data was sent.
436+ * Sends the buffer's content to the database and compacts the buffer.
437+ * If the last row is not finished it stays in the sender's buffer.
438+ *
439+ * @return {Promise<boolean> } Resolves to true when there was data in the buffer to send, and it was sent successfully.
483440 */
484- private async _executeFlush ( ) : Promise < boolean > {
485- const dataAmountToSend = this . endOfLastRow ;
486- if ( dataAmountToSend <= 0 ) {
441+ async flush ( ) : Promise < boolean > {
442+ const dataToSend = this . toBufferNew ( this . endOfLastRow ) ;
443+ if ( ! dataToSend ) {
487444 return false ; // Nothing to send
488445 }
489446
490- // Use toBufferView to get a reference, actual data copy for sending happens based on protocol needs
491- const dataView = this . toBufferView ( dataAmountToSend ) ;
492- if ( ! dataView ) {
493- return false ; // Should not happen if dataAmountToSend > 0, but a safe check
494- }
495-
496- // Create a copy for sending to avoid issues if the underlying buffer changes
497- // This is especially important for async operations.
498- const dataToSend = Buffer . allocUnsafe ( dataView . length ) ;
499- dataView . copy ( dataToSend ) ;
500-
501447 try {
502448 if ( this . http ) {
503449 const { timeout : calculatedTimeoutMillis } = createRequestOptions ( this , dataToSend ) ;
@@ -557,13 +503,11 @@ class Sender {
557503 `Unexpected message from server: ${ Buffer . from ( responseBody ) . toString ( ) } ` ,
558504 ) ;
559505 }
560- this . _compactBufferAndResetState ( dataAmountToSend ) ;
561506 return true ;
562507 } else {
563- const error = new Error (
508+ throw new Error (
564509 `HTTP request failed, statusCode=${ statusCode } , error=${ Buffer . from ( responseBody ) . toString ( ) } ` ,
565510 ) ;
566- throw error ;
567511 }
568512 } else { // TCP
569513 if ( ! this . socket || this . socket . destroyed ) {
@@ -574,7 +518,6 @@ class Sender {
574518 if ( err ) {
575519 reject ( err ) ;
576520 } else {
577- this . _compactBufferAndResetState ( dataAmountToSend ) ;
578521 resolve ( true ) ;
579522 }
580523 } ) ;
@@ -609,51 +552,27 @@ class Sender {
609552 }
610553 }
611554
612- /**
613- * Sends the buffer's content to the database and compacts the buffer.
614- * If the last row is not finished it stays in the sender's buffer.
615- * This operation is added to a queue and executed sequentially.
616- *
617- * @return {Promise<boolean> } Resolves to true when there was data in the buffer to send, and it was sent successfully.
618- */
619- async flush ( ) : Promise < boolean > {
620- // Add to the promise chain to ensure sequential execution
621- this . flushPromiseChain = this . flushPromiseChain
622- . then ( async ( ) => {
623- // Check if there's anything to flush just before execution
624- if ( this . endOfLastRow <= 0 ) {
625- return false ; // Nothing to flush
626- }
627- return this . _executeFlush ( ) ;
628- } )
629- . catch ( ( err : Error ) => {
630- // Log or handle error. If _executeFlush throws, it will be caught here.
631- // The error should have already been logged by _executeFlush.
632- // We re-throw to ensure the promise chain reflects the failure.
633- this . log ( "error" , `Flush operation failed in chain: ${ err . message } ` ) ;
634- throw err ; // Propagate error to the caller of this specific flush()
635- } ) ;
636- return this . flushPromiseChain ;
637- }
638-
639555 /**
640556 * @ignore
641- * @return {Buffer } Returns a cropped buffer ready to send to the server or null if there is nothing to send.
557+ * @return {Buffer } Returns a cropped buffer, or null if there is nothing to send.
642558 * The returned buffer is backed by the sender's buffer.
559+ * Used only in tests.
643560 */
644561 toBufferView ( pos = this . position ) : Buffer {
645562 return pos > 0 ? this . buffer . subarray ( 0 , pos ) : null ;
646563 }
647564
648565 /**
649566 * @ignore
650- * @return {Buffer|null } Returns a cropped buffer ready to send to the server or null if there is nothing to send.
567+ * @return {Buffer|null } Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
651568 * The returned buffer is a copy of the sender's buffer.
569+ * It also compacts the Sender's buffer.
652570 */
653571 toBufferNew ( pos = this . position ) : Buffer | null {
654572 if ( pos > 0 ) {
655573 const data = Buffer . allocUnsafe ( pos ) ;
656574 this . buffer . copy ( data , 0 , 0 , pos ) ;
575+ compact ( this ) ;
657576 return data ;
658577 }
659578 return null ;
@@ -928,7 +847,7 @@ function createRequestOptions(
928847) : InternalHttpOptions {
929848 const timeoutMillis =
930849 ( data . length / sender . requestMinThroughput ) * 1000 + sender . requestTimeout ;
931- const options : InternalHttpOptions = {
850+ return {
932851 hostname : sender . host ,
933852 port : sender . port ,
934853 agent : sender . agent ,
@@ -937,8 +856,6 @@ function createRequestOptions(
937856 method : "POST" ,
938857 timeout : timeoutMillis ,
939858 } ;
940-
941- return options ;
942859}
943860
944861async function autoFlush ( sender : Sender ) {
@@ -950,12 +867,7 @@ async function autoFlush(sender: Sender) {
950867 ( sender . autoFlushInterval > 0 &&
951868 Date . now ( ) - sender . lastFlushTime >= sender . autoFlushInterval ) )
952869 ) {
953- // await sender.flush(); // Old call
954- sender . flush ( ) . catch ( err => {
955- // Auto-flush errors should be logged but not necessarily crash the application
956- // The error is already logged by the flush chain's catch block or _executeFlush
957- sender . log ( "error" , `Auto-flush failed: ${ err . message } ` ) ;
958- } ) ;
870+ await sender . flush ( ) ;
959871 }
960872}
961873
@@ -973,6 +885,17 @@ function checkCapacity(sender: Sender, data: string[], base = 0) {
973885 }
974886}
975887
888+ function compact ( sender : Sender ) {
889+ if ( sender . endOfLastRow > 0 ) {
890+ sender . buffer . copy ( sender . buffer , 0 , sender . endOfLastRow , sender . position ) ;
891+ sender . position = sender . position - sender . endOfLastRow ;
892+ sender . endOfLastRow = 0 ;
893+
894+ sender . lastFlushTime = Date . now ( ) ;
895+ sender . pendingRowCount = 0 ;
896+ }
897+ }
898+
976899function writeColumn (
977900 sender : Sender ,
978901 name : string ,
@@ -1073,18 +996,26 @@ function timestampToNanos(timestamp: bigint, unit: "ns" | "us" | "ms") {
1073996}
1074997
1075998type DeprecatedOptions = {
999+ /** @deprecated */
1000+ copy_buffer ?: boolean ;
10761001 /** @deprecated */
10771002 copyBuffer ?: boolean ;
10781003 /** @deprecated */
10791004 bufferSize ?: number ;
10801005} ;
1081- function replaceDeprecatedOptions ( options : SenderOptions & DeprecatedOptions ) {
1006+ function replaceDeprecatedOptions (
1007+ options : SenderOptions & DeprecatedOptions ,
1008+ log : ( level : "error" | "warn" | "info" | "debug" , message : string ) => void
1009+ ) {
10821010 // deal with deprecated options
1083- if ( options . copyBuffer ) {
1084- options . copy_buffer = options . copyBuffer ;
1085- options . copyBuffer = undefined ;
1011+ if ( options . copy_buffer !== undefined ) {
1012+ log ( "warn" , `Option 'copy_buffer' is not supported anymore, please, remove it` ) ;
1013+ }
1014+ if ( options . copyBuffer !== undefined ) {
1015+ log ( "warn" , `Option 'copyBuffer' is not supported anymore, please, remove it` ) ;
10861016 }
1087- if ( options . bufferSize ) {
1017+ if ( options . bufferSize !== undefined ) {
1018+ log ( "warn" , `Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'` ) ;
10881019 options . init_buf_size = options . bufferSize ;
10891020 options . bufferSize = undefined ;
10901021 }
0 commit comments