Skip to content

Commit 29840a8

Browse files
committed
implemented retry logic for synchronizer
1 parent 5d24441 commit 29840a8

File tree

2 files changed

+44
-11
lines changed

2 files changed

+44
-11
lines changed

src/mesh/agents/state/causal/CausalHistoryProvider.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ class CausalHistoryProvider {
606606
private startStreamingResponse(respInfo: ResponseInfo) {
607607
this.streamingResponses = this.streamingResponses + 1;
608608

609-
if (this.streamingResponses === 1 && this.streamingResponsesInterval === undefined) {
609+
if (this.streamingResponsesInterval === undefined) {
610610
this.streamingResponsesInterval = setInterval(this.continueStreamingResponses, 100);
611611
}
612612

src/mesh/agents/state/causal/CausalHistorySynchronizer.ts

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { RequestMsg, ResponseMsg, CancelRequestMsg } from './CausalHistoryProvid
1818
const MaxRequestsPerRemote = 2;
1919
const MaxPendingOps = 1024;
2020

21-
const RequestTimeout = 20;
21+
const RequestTimeout = 25;
2222
const LiteralArrivalTimeout = 10;
2323

2424
type RequestInfo = {
@@ -79,6 +79,8 @@ class CausalHistorySynchronizer {
7979
newRequestsLock: Lock;
8080
needToRetryNewRequests: boolean;
8181

82+
checkRequestTimeoutsInterval?: any;
83+
8284
readonly logPrefix: Hash;
8385
controlLog : Logger;
8486
sourcesLog : Logger;
@@ -110,6 +112,8 @@ class CausalHistorySynchronizer {
110112
this.newRequestsLock = new Lock();
111113
this.needToRetryNewRequests = false;
112114

115+
this.checkRequestTimeouts = this.checkRequestTimeouts.bind(this);
116+
113117
this.logPrefix = 'On peer ' + this.syncAgent.peerGroupAgent.localPeer.identity?.hash() as Hash + ':';
114118

115119
this.controlLog = CausalHistorySynchronizer.controlLog;
@@ -144,6 +148,19 @@ class CausalHistorySynchronizer {
144148
this.attemptNewRequests();
145149
}
146150

151+
checkRequestTimeouts() {
152+
153+
let cancelled = false;
154+
155+
for (const reqInfo of this.requests.values()) {
156+
cancelled = cancelled || this.checkRequestRemoval(reqInfo);
157+
}
158+
159+
if (cancelled) {
160+
this.attemptNewRequests();
161+
}
162+
}
163+
147164
private async attemptNewRequests() {
148165

149166
if (this.newRequestsLock.acquire()) {
@@ -211,8 +228,6 @@ class CausalHistorySynchronizer {
211228

212229
const opHistoryFromGossip = Array.from(this.endpointsForUnknownHistory.keys());
213230

214-
215-
216231
for (const hash of opHistoryFromGossip) {
217232

218233
const isUnrequested = this.opHistoryIsUnrequested(hash);
@@ -245,6 +260,8 @@ class CausalHistorySynchronizer {
245260

246261
const isUnrequested = this.opHistoryIsUnrequested(hash);
247262
const isMissingFromStore = isUnrequested && await this.opHistoryIsMissingFromStore(hash);
263+
// (*) the above is short circuited like that only for performance: if it is not unrequested
264+
// it doesn't matter wheter it is in the store or not, we will not ask for it again.
248265

249266
if (isUnrequested && isMissingFromStore) {
250267

@@ -284,8 +301,6 @@ class CausalHistorySynchronizer {
284301

285302
sortedOpHistorySources.sort((s1:[Endpoint, Set<Hash>], s2:[Endpoint, Set<Hash>]) => s2[1].size - s1[1].size);
286303

287-
288-
289304
const opHistoriesToRequest = new Array<[Endpoint, Set<Hash>]>();
290305

291306
this.controlLog.trace('\n'+this.logPrefix+'\nWill check ' + sortedOpHistorySources.length + ' remote sources');
@@ -321,10 +336,12 @@ class CausalHistorySynchronizer {
321336
}
322337
}
323338

339+
const startingOpHistories = this.computeStartingOpHistories();
340+
324341
for (const [remote, opHistories] of opHistoriesToRequest) {
325342

326-
const startingOpHistories = this.computeStartingOpHistories();
327-
const startingOps = this.computeStartingOps(remote);
343+
344+
const startingOps = this.computeStartingOps(remote);
328345

329346
const ops = this.findOpsToRequest(remote, startingOps);
330347

@@ -449,13 +466,28 @@ class CausalHistorySynchronizer {
449466

450467
let sent = this.sendRequest(reqInfo);
451468

452-
if (!sent) {
469+
if (sent) {
470+
this.checkRequestTimeoutsTimer();
471+
} else {
453472
this.cleanupRequest(reqInfo);
454473
}
455474

456475
return sent;
457476
}
458477

478+
private checkRequestTimeoutsTimer() {
479+
if (this.requests.size > 0) {
480+
if (this.checkRequestTimeoutsInterval === undefined) {
481+
this.checkRequestTimeoutsInterval = setInterval(this.checkRequestTimeouts, 5000);
482+
}
483+
} else {
484+
if (this.checkRequestTimeoutsInterval !== undefined) {
485+
clearInterval(this.checkRequestTimeoutsInterval);
486+
this.checkRequestTimeoutsInterval = undefined;
487+
}
488+
}
489+
}
490+
459491
// Do some intelligence over which ops can be requested from an endpoint
460492

461493
private findOpsToRequest(remote: Endpoint, startingOps: Set<Hash>) {
@@ -852,8 +884,6 @@ class CausalHistorySynchronizer {
852884
await this.syncAgent.store.save(op);
853885

854886
this.opXferLog.debug('\n'+this.logPrefix+'\nReceived op ' + literal.hash + ' from request ' + reqInfo.request.requestId);
855-
856-
this.checkRequestRemoval(reqInfo);
857887

858888
const removed = this.checkRequestRemoval(reqInfo);
859889

@@ -1400,6 +1430,9 @@ class CausalHistorySynchronizer {
14001430

14011431
this.requests.delete(reqInfo.request.requestId);
14021432

1433+
// see if we can shut down the timer checking for timeouts
1434+
this.checkRequestTimeoutsTimer();
1435+
14031436
}
14041437

14051438
private async logStoreContents() {

0 commit comments

Comments
 (0)