Skip to content

Commit 9847113

Browse files
committed
fix(protocol): ensure writing network and queue in sync
1 parent 6df1c66 commit 9847113

File tree

1 file changed

+40
-90
lines changed

1 file changed

+40
-90
lines changed

src/lib/ProtocolClient.ts

Lines changed: 40 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import * as C from './Common';
1818
import { EventEmitter } from 'node:events';
19-
import * as $Net from 'net';
19+
import * as $Net from 'node:net';
2020
import * as E from './Errors';
2121

2222
enum ERequestState {
@@ -508,29 +508,6 @@ export class ProtocolClient
508508
return this._command(cmd, args, cb);
509509
}
510510

511-
protected async _write2Socket(data: Buffer, socket?: $Net.Socket): Promise<void> {
512-
513-
if (!socket) {
514-
515-
socket = await this._getConnection();
516-
}
517-
518-
return new Promise<void>((resolve, reject) => {
519-
520-
socket.write(data, (e) => {
521-
522-
if (e) {
523-
524-
reject(e);
525-
}
526-
else {
527-
528-
resolve();
529-
}
530-
});
531-
});
532-
}
533-
534511
protected _checkQueueSize(): void {
535512

536513
if (this._cfg.queueSize && this._executingQueue.length >= this._cfg.queueSize) {
@@ -548,45 +525,24 @@ export class ProtocolClient
548525

549526
return this._unifyAsync(async (callback) => {
550527

551-
const execQueue = this._executingQueue;
552-
553528
this._checkQueueSize();
554529

555-
const data = this._encoder.encodeCommand(cmd, args);
530+
const socket = this._socket ?? await this._getConnection();
531+
532+
socket.write(this._encoder.encodeCommand(cmd, args));
556533

557534
const handle: IQueueItem = {
558535
callback,
559536
state: ERequestState.PENDING,
560537
};
561538

562-
await this._write2Socket(data);
563-
564-
if (execQueue !== this._executingQueue) {
565-
566-
handle.callback(new E.E_CONN_LOST());
567-
return;
568-
}
569-
570-
execQueue.push(handle);
539+
this._executingQueue.push(handle);
571540

572541
if (this._cfg.commandTimeout > 0) {
573542

574-
handle.timeout = setTimeout(() => {
575-
576-
switch (handle.state) {
577-
case ERequestState.PENDING:
578-
handle.state = ERequestState.TIMEOUT;
579-
handle.callback(new E.E_COMMAND_TIMEOUT({
580-
mode: 'mono', cmd, argsQty: args.length
581-
}));
582-
break;
583-
case ERequestState.DONE:
584-
case ERequestState.TIMEOUT:
585-
default:
586-
break;
587-
}
588-
589-
}, this._cfg.commandTimeout);
543+
this._setTimeoutForRequest(handle, () => new E.E_COMMAND_TIMEOUT({
544+
mode: 'mono', cmd, argsQty: args.length
545+
}));
590546
}
591547

592548
}, cb);
@@ -598,6 +554,10 @@ export class ProtocolClient
598554

599555
this._checkQueueSize();
600556

557+
const socket = this._socket ?? await this._getConnection();
558+
559+
socket.write(Buffer.concat(cmdList.map((x) => this._encoder.encodeCommand(x.cmd, x.args))));
560+
601561
const handle: IQueueBatchItem = {
602562
callback,
603563
expected: cmdList.length,
@@ -609,36 +569,21 @@ export class ProtocolClient
609569

610570
if (this._cfg.commandTimeout > 0) {
611571

612-
handle.timeout = setTimeout(() => {
613-
614-
switch (handle.state) {
615-
case ERequestState.PENDING:
616-
handle.state = ERequestState.TIMEOUT;
617-
handle.callback(new E.E_COMMAND_TIMEOUT({
618-
mode: 'bulk', cmdQty: handle.expected
619-
}));
620-
break;
621-
case ERequestState.DONE:
622-
case ERequestState.TIMEOUT:
623-
default:
624-
break;
625-
}
626-
627-
}, this._cfg.commandTimeout);
572+
this._setTimeoutForRequest(handle, () => new E.E_COMMAND_TIMEOUT({
573+
mode: 'bulk', cmdQty: handle.expected
574+
}));
628575
}
629576

630-
const data = Buffer.concat(cmdList.map((x) => this._encoder.encodeCommand(x.cmd, x.args)));
631-
632-
await this._write2Socket(data);
633-
634577
}, cb);
635578
}
636579

637580
protected async _commitExec(qty: number): Promise<any> {
638581

639582
return this._unifyAsync(async (callback) => {
640583

641-
const data = this._encoder.encodeCommand('EXEC', []);
584+
const socket = this._socket ?? await this._getConnection();
585+
586+
socket.write(this._encoder.encodeCommand('EXEC', []));
642587

643588
const handle: IQueueBatchItem = {
644589
callback,
@@ -647,29 +592,34 @@ export class ProtocolClient
647592
result: [],
648593
};
649594

650-
await this._write2Socket(data);
651-
652595
this._executingQueue.push(handle);
653596

654597
if (this._cfg.commandTimeout > 0) {
655598

656-
handle.timeout = setTimeout(() => {
599+
this._setTimeoutForRequest(handle, () => new E.E_COMMAND_TIMEOUT({
600+
mode: 'bulk', cmdQty: handle.expected
601+
}));
602+
}
603+
});
604+
}
657605

658-
switch (handle.state) {
659-
case ERequestState.PENDING:
660-
handle.state = ERequestState.TIMEOUT;
661-
handle.callback(new E.E_COMMAND_TIMEOUT({
662-
mode: 'bulk', cmdQty: handle.expected
663-
}));
664-
break;
665-
case ERequestState.DONE:
666-
case ERequestState.TIMEOUT:
667-
default:
668-
break;
669-
}
606+
private _setTimeoutForRequest(handle: IQueueItem, mkError: () => Error): void {
607+
608+
handle.timeout = setTimeout(() => {
670609

671-
}, this._cfg.commandTimeout);
610+
delete handle.timeout;
611+
612+
switch (handle.state) {
613+
case ERequestState.PENDING:
614+
handle.state = ERequestState.TIMEOUT;
615+
handle.callback(mkError());
616+
break;
617+
case ERequestState.DONE:
618+
case ERequestState.TIMEOUT:
619+
default:
620+
break;
672621
}
673-
});
622+
623+
}, this._cfg.commandTimeout);
674624
}
675625
}

0 commit comments

Comments
 (0)