Skip to content

Commit 85a8231

Browse files
committed
fix: the gossip agent was not passing received states to the sync agent if the states were not new (this is an error since the sync agent may need this info if some peers go offline during sync - also: it makes the sync state reporting go haywire)
1 parent ab0bc0a commit 85a8231

File tree

2 files changed

+55
-69
lines changed

2 files changed

+55
-69
lines changed

src/mesh/agents/state/StateGossipAgent.ts

Lines changed: 54 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class StateGossipAgent extends PeeringAgentBase {
153153
localStateObjects: Map<AgentId, HashedObject>;
154154
remoteStateObjects: Map<Endpoint, Map<AgentId, HashedObject>>;
155155

156-
previousStatesCache: Map<AgentId, Array<Hash>>;
156+
previousStatesCaches: Map<AgentId, LRUCache<Hash, HashedObject>>;
157157

158158
sentStateCache: LRUCache<string, {timestamp: number, stateHash: Hash, repeats: number}>;
159159

@@ -175,7 +175,7 @@ class StateGossipAgent extends PeeringAgentBase {
175175
this.localStateObjects = new Map();
176176
this.remoteStateObjects = new Map();
177177

178-
this.previousStatesCache = new Map();
178+
this.previousStatesCaches = new Map();
179179

180180
this.sentStateCache = new LRUCache(512);
181181

@@ -233,7 +233,7 @@ class StateGossipAgent extends PeeringAgentBase {
233233

234234
untrackAgentState(agentId: AgentId) {
235235
this.trackedAgentIds.delete(agentId);
236-
this.previousStatesCache.delete(agentId);
236+
this.previousStatesCaches.delete(agentId);
237237
this.localState.delete(agentId);
238238
this.localStateObjects.delete(agentId);
239239
}
@@ -300,7 +300,7 @@ class StateGossipAgent extends PeeringAgentBase {
300300
private clearAgentState(agentId: AgentId) {
301301
this.localState.delete(agentId);
302302
this.localStateObjects.delete(agentId);
303-
this.previousStatesCache.delete(agentId);
303+
this.previousStatesCaches.delete(agentId);
304304
}
305305

306306
private clearPeerState(endpoint: Endpoint) {
@@ -321,7 +321,7 @@ class StateGossipAgent extends PeeringAgentBase {
321321
const currentState = this.localState.get(agentId);
322322

323323
if (currentState !== undefined && hash !== currentState) {
324-
this.cachePreviousStateHash(agentId, currentState);
324+
this.cachePreviousState(agentId, currentState, this.localStateObjects.get(agentId) as HashedObject);
325325
}
326326

327327
this.localState.set(agentId, hash);
@@ -383,36 +383,20 @@ class StateGossipAgent extends PeeringAgentBase {
383383
}
384384
}
385385

386-
private cachePreviousStateHash(agentId: AgentId, state: Hash) {
386+
private cachePreviousState(agentId: AgentId, stateHash: Hash, stateObj: HashedObject) {
387387

388-
let prevStates = this.previousStatesCache.get(agentId);
388+
let prevStates = this.previousStatesCaches.get(agentId);
389389

390390
if (prevStates === undefined) {
391-
prevStates = [];
392-
this.previousStatesCache.set(agentId, prevStates);
391+
prevStates = new LRUCache(this.params.maxCachedPrevStates);
392+
this.previousStatesCaches.set(agentId, prevStates);
393393
}
394394

395-
// remove if already cached
396-
let idx = prevStates.indexOf(state);
397-
if (idx >= 0) {
398-
prevStates.splice(idx, 1);
399-
}
400-
401-
// truncate array to make room for new state
402-
const maxLength = this.params.maxCachedPrevStates - 1;
403-
if (prevStates.length > maxLength) {
404-
const toDelete = prevStates.length - maxLength;
405-
prevStates.splice(maxLength, toDelete);
406-
}
407-
408-
// put state at the start of the cached states array
409-
prevStates.unshift(state);
410-
395+
prevStates.set(stateHash, stateObj);
411396
}
412397

413-
private stateHashIsInPreviousCache(agentId: AgentId, state: Hash): boolean {
414-
const cache = this.previousStatesCache.get(agentId);
415-
return cache !== undefined && cache.indexOf(state) >= 0;
398+
private getStateFromPreviousCache(agentId: AgentId, stateHash: Hash): HashedObject|undefined {
399+
return this.previousStatesCaches.get(agentId)?.get(stateHash);
416400
}
417401

418402
// handling and caching of remote states
@@ -493,7 +477,7 @@ class StateGossipAgent extends PeeringAgentBase {
493477

494478
// message handling
495479

496-
private receiveFullState(sender: Endpoint, state: PeerState) {
480+
private async receiveFullState(sender: Endpoint, state: PeerState) {
497481

498482
for(const [agentId, hash] of state.entries()) {
499483

@@ -502,33 +486,34 @@ class StateGossipAgent extends PeeringAgentBase {
502486

503487
if (agent !== undefined) {
504488

505-
const currentState = this.localState.get(agentId);
489+
//const currentState = this.localState.get(agentId);
506490

507-
if (currentState !== hash) {
508-
const cacheHit = this.stateHashIsInPreviousCache(agentId, hash);
509-
if (! cacheHit) {
510-
511-
try {
512-
513-
const stateObj = this.lookupStateObject(agentId, hash);
514-
515-
if (stateObj === undefined) {
516-
this.requestStateObject(sender, agentId);
517-
} else {
518-
this.receiveStateObject(sender, agentId, stateObj, Date.now());
519-
}
491+
//if (currentState !== hash) {
492+
493+
let stateObj = this.getStateFromPreviousCache(agentId, hash);
520494

521-
522-
} catch (e) {
523-
StateGossipAgent.controlLog.warning('Error while processing received state for ' + agentId, e);
524-
}
495+
if (stateObj === undefined) {
525496

497+
stateObj = this.lookupStateObject(agentId, hash);
526498

527499
// I _think_ it's better to not gossip in this case.
528500
} else {
529501
StateGossipAgent.controlLog.trace('Not gossiping to ' + agentId + ' because ' + hash + ' is in the prev state cache');
530502
}
531-
}
503+
504+
try {
505+
506+
if (stateObj === undefined) {
507+
this.requestStateObject(sender, agentId);
508+
} else {
509+
this.receiveStateObject(sender, agentId, stateObj, Date.now());
510+
}
511+
512+
} catch (e) {
513+
StateGossipAgent.controlLog.warning('Error while processing received state for ' + agentId, e);
514+
}
515+
516+
//}
532517
}
533518
} else {
534519
StateGossipAgent.controlLog.debug('Received state for agentId ' + agentId + ', but it is not being tracked')
@@ -540,36 +525,37 @@ class StateGossipAgent extends PeeringAgentBase {
540525

541526
private async receiveStateObject(sender: Endpoint, agentId: AgentId, stateObj: HashedObject, _timestamp: number) {
542527

543-
if (await stateObj.validate(new Map())) {
544-
const state = stateObj.hash();
528+
const stateHash = stateObj.hash();
545529

546-
this.setRemoteState(sender, agentId, state, stateObj)
530+
if (stateHash !== this.getRemoteState(sender, agentId)) {
531+
if (await stateObj.validate(new Map())) {
547532

548-
const cacheHit = this.stateHashIsInPreviousCache(agentId, state);
549-
550-
let receivedOldState = cacheHit;
551533

552-
if (!receivedOldState) {
534+
this.setRemoteState(sender, agentId, stateHash, stateObj);
535+
this.cachePreviousState(agentId, stateHash, stateObj);
553536

554-
try {
555-
receivedOldState = ! (await this.notifyAgentOfStateArrival(sender, agentId, state, stateObj));
537+
let receivedOldState = false;
556538

557-
StateGossipAgent.controlLog.trace('Received state for ' + agentId + ': (' + state + (receivedOldState? 'old' : 'new') + ')');
539+
try {
540+
receivedOldState = ! (await this.notifyAgentOfStateArrival(sender, agentId, stateHash, stateObj));
558541
} catch (e) {
559542
// maybe cache erroneous states so we don't process them over and over?
560543
StateGossipAgent.controlLog.warning('Received erroneous state from ' + sender + ' for ' + agentId, e);
561544
}
562-
563-
}
564-
565-
if (receivedOldState && this.localState.get(agentId) !== state && this.localStateObjects.get(agentId) !== undefined) {
566-
this.peerMessageLog.trace('Received old state for ' + agentId + ' from ' + sender + ', sending our own state over there.');
567-
this.sendStateObject(sender, agentId);
545+
546+
StateGossipAgent.controlLog.trace('Received state for ' + agentId + ': (' + stateHash + '-' + (receivedOldState? 'old' : 'new') + ')');
547+
548+
if (receivedOldState && this.localState.get(agentId) !== stateHash && this.localStateObjects.get(agentId) !== undefined) {
549+
this.peerMessageLog.trace('Received old state for ' + agentId + ' from ' + sender + ', sending our own state over there.');
550+
this.sendStateObject(sender, agentId);
551+
}
552+
} else {
553+
this.peerMessageLog.trace('Received invalid state for ' + agentId + ' from ' + sender + ', ignoring.');
568554
}
569-
} else {
570-
this.peerMessageLog.trace('Received invalid state for ' + agentId + ' from ' + sender + ', ignoring.');
571555
}
572556

557+
558+
573559
}
574560

575561
private async notifyAgentOfStateArrival(sender: Endpoint, agentId: AgentId, stateHash: Hash, state: HashedObject) : Promise<boolean> {
@@ -691,14 +677,15 @@ class StateGossipAgent extends PeeringAgentBase {
691677

692678
let state: {[key: Endpoint]: Hash} = {};
693679

680+
694681
for (const [endpoint, peerState] of this.remoteState.entries()) {
695682

696683
if (this.peerGroupAgent.isPeer(endpoint)) { // <-- this needs to work even if we still haven't received the
697684
const stateHash = peerState.get(agentId); // event telling us that endpoint is offline, so call directly
698685

699686
if (stateHash !== undefined) {
700687
state[endpoint] = stateHash;
701-
}
688+
}
702689
}
703690
}
704691

src/mesh/agents/state/SyncObserverAgent.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class SyncObserverAgent implements Agent {
9393
tuple[0].addObserver(obs);
9494

9595
if (syncAgent !== undefined) {
96+
9697
obs({
9798
emitter: mut,
9899
action: SyncObserverEventTypes.SyncStateUpdate,
@@ -156,8 +157,6 @@ class SyncObserverAgent implements Agent {
156157

157158
} else if (agentEv.content.change === AgentSetChange.Removal) {
158159

159-
console.log('SENDING LAST EV FOR ' + mut.getLastHash());
160-
161160
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent|undefined;
162161
syncAgent?.getSyncEventSource().removeObserver(syncAgentObserver); // I think this will have no effect
163162
relay.emit({

0 commit comments

Comments
 (0)