Skip to content

Commit 40b4cd6

Browse files
committed
renamed HashedObject's addMutationObserver to addObserver
added new sync state visibility APIs (WIP...)
1 parent 0a3f319 commit 40b4cd6

File tree

18 files changed

+282
-126
lines changed

18 files changed

+282
-126
lines changed

src/data/model/immutable/HashedObject.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,11 @@ abstract class HashedObject {
341341
return new EventRelay(this, subObservers);
342342
}
343343

344-
addMutationObserver(obs: MutationObserver) {
344+
addObserver(obs: MutationObserver) {
345345
this.getMutationEventSource().addObserver(obs);
346346
}
347347

348-
removeMutationObserver(obs: MutationObserver) {
348+
removeObserver(obs: MutationObserver) {
349349
this._mutationEventSource?.removeObserver(obs);
350350
}
351351

src/data/model/mutable/MutableObject.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { HashedObject } from '../immutable/HashedObject';
44
import { Context } from '../literals/Context';
55
import { Hash } from '../hashing/Hashing';
66
import { Logger, LogLevel } from 'util/logging';
7-
import { HeaderBasedSyncAgent, StateSyncAgent, StateFilter } from 'mesh/agents/state';
7+
import { HeaderBasedSyncAgent, StateSyncAgent, StateFilter, SyncState, SyncObserver } from 'mesh/agents/state';
88
import { PeerGroupAgent } from 'mesh/agents/peer';
99
import { HashedSet } from '../immutable/HashedSet';
1010
import { HashReference } from '../immutable/HashReference';
@@ -603,7 +603,7 @@ abstract class MutableObject extends HashedObject {
603603
for (const elmt of aliases) {
604604
if (!seen.has(elmt)) {
605605
seen.add(elmt);
606-
MutableObject.addEventRelayForElmt(own, hash, elmt);
606+
MutableObject.removeEventRelayForElmt(own, hash, elmt);
607607
}
608608
}
609609
}
@@ -615,10 +615,40 @@ abstract class MutableObject extends HashedObject {
615615
//return new TerminalOpsSyncAgent(peerGroupAgent, this.getLastHash(), this.getStore(), this._acceptedMutationOpClasses);
616616
}
617617

618+
getSyncAgentId(peerGroupId: string) {
619+
return HeaderBasedSyncAgent.syncAgentIdFor(this.getLastHash(), peerGroupId);
620+
}
621+
618622
getSyncAgentStateFilter() : StateFilter | undefined {
619623
return undefined;
620624
}
621625

626+
async getSyncState(peerGroupId?: string): Promise<SyncState|undefined> {
627+
return this.getResources()?.mesh.getSyncState(this, peerGroupId);
628+
}
629+
630+
addSyncObserver(obs: SyncObserver, peerGroupId?: string) {
631+
632+
const mesh = this.getResources()?.mesh;
633+
634+
if (mesh === undefined) {
635+
throw new Error('Trying to add a sync observer, but object ' + this.hash() + ' does not have a mesh resource.');
636+
}
637+
638+
mesh.addSyncObserver(obs, this, peerGroupId);
639+
}
640+
641+
removeSyncObserver(obs: SyncObserver, peerGroupId?: string) {
642+
643+
const mesh = this.getResources()?.mesh;
644+
645+
if (mesh === undefined) {
646+
throw new Error('Trying to add a sync observer, but object ' + this.hash() + ' does not have a mesh resource.');
647+
}
648+
649+
mesh.removeSyncObserver(obs, this, peerGroupId);
650+
}
651+
622652
getAcceptedMutationOpClasses() : Array<string> {
623653
return this._acceptedMutationOpClasses;
624654
}

src/mesh/agents/state.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export * from './state/StateSyncAgent';
33
export * from './state/TerminalOpsState';
44
export * from './state/TerminalOpsSyncAgent';
55
export * from './state/HeaderBasedSyncAgent';
6+
export * from './state/SyncObserverAgent';
67
export * from './state/history/HeaderBasedState';
78
export * from './state/history/HistoryProvider';
89
export * from './state/history/HistorySynchronizer';

src/mesh/agents/state/HeaderBasedSyncAgent.ts

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import { Endpoint } from '../network/NetworkAgent';
66
import { PeerGroupAgent } from '../peer/PeerGroupAgent';
77
import { PeeringAgentBase } from '../peer/PeeringAgentBase';
88
import { HeaderBasedState } from './history/HeaderBasedState';
9-
import { AgentStateUpdateEvent, GossipEventTypes } from './StateGossipAgent';
9+
import { AgentStateUpdateEvent, GossipEventTypes, StateGossipAgent } from './StateGossipAgent';
1010
import { StateSyncAgent } from './StateSyncAgent';
1111

1212
import { HistorySynchronizer } from './history/HistorySynchronizer';
1313
import { HistoryProvider, MessageType, SyncMsg } from './history/HistoryProvider';
1414
import { OpHeader, OpHeaderLiteral } from 'data/history/OpHeader';
1515
import { Resources } from 'spaces/Resources';
16+
import { EventRelay } from 'util/events';
17+
import { SyncObserverEventTypes, SyncState, SyncStateUpdateEvent } from './SyncObserverAgent';
1618

1719
/*
1820
* Important notice: The constructor of the HeaderBasedSyncAgent can receive either an instance or just the
@@ -36,7 +38,7 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
3638

3739
static MaxRequestsPerRemote = 2;
3840

39-
mutableObj?: MutableObject;
41+
mutableObj: MutableObject;
4042
mutableObjHash: Hash;
4143
acceptedMutationOpClasses: string[];
4244
stateOpFilter?: StateFilter;
@@ -55,20 +57,19 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
5557
synchronizer : HistorySynchronizer;
5658
provider : HistoryProvider;
5759

60+
syncEventSource?: EventRelay<HashedObject>;
61+
5862
terminated = false;
5963

6064
controlLog: Logger;
6165
messageLog: Logger;
6266

63-
constructor(peerGroupAgent: PeerGroupAgent, mutableObjOrHash: MutableObject|Hash, resources: Resources, acceptedMutationOpClasses : string[], stateOpFilter?: StateFilter) {
67+
constructor(peerGroupAgent: PeerGroupAgent, mutableObj: MutableObject, resources: Resources, acceptedMutationOpClasses : string[], stateOpFilter?: StateFilter) {
6468
super(peerGroupAgent);
6569

66-
if (mutableObjOrHash instanceof MutableObject) {
67-
this.mutableObj = mutableObjOrHash;
68-
this.mutableObjHash = mutableObjOrHash.hash();
69-
} else {
70-
this.mutableObjHash = mutableObjOrHash;
71-
}
70+
this.mutableObj = mutableObj;
71+
this.mutableObjHash = mutableObj.hash();
72+
7273
this.acceptedMutationOpClasses = acceptedMutationOpClasses;
7374
this.stateOpFilter = stateOpFilter;
7475

@@ -86,16 +87,14 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
8687
this.messageLog = HeaderBasedSyncAgent.messageLog;
8788
}
8889

89-
90-
91-
9290
getAgentId(): string {
9391
return HeaderBasedSyncAgent.syncAgentIdFor(this.mutableObjHash, this.peerGroupAgent.peerGroupId);
9492
}
9593

9694
ready(pod: AgentPod): void {
9795

9896
this.pod = pod;
97+
9998
this.updateStateFromStore().then(async () => {
10099
if (this.stateOpHeadersByOpHash !== undefined) {
101100
for (const opHistory of this.stateOpHeadersByOpHash.values()) {
@@ -112,6 +111,14 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
112111

113112
shutdown(): void {
114113

114+
const ev: SyncStateUpdateEvent = {
115+
emitter: this.mutableObj,
116+
action: SyncObserverEventTypes.SyncStateUpdate,
117+
data: {inSync: false, synchronizing: false, opsToFetch: 0, remoteStateHashes: {}}
118+
};
119+
120+
this.syncEventSource?.emit(ev);
121+
115122
this.terminated = true;
116123
this.synchronizer.shutdown();
117124

@@ -206,6 +213,16 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
206213
await this.synchronizer.onNewLocalOp(op);
207214
await this.updateStateFromStore();
208215
}
216+
217+
if (this.syncEventSource !== undefined) {
218+
const ev: SyncStateUpdateEvent = {
219+
emitter: this.mutableObj,
220+
action: SyncObserverEventTypes.SyncStateUpdate,
221+
data: this.getSyncState()
222+
};
223+
224+
this.syncEventSource.emit(ev);
225+
}
209226
};
210227

211228
literalIsValidOp(literal?: Literal, log=false): boolean {
@@ -324,6 +341,46 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
324341
return this.synchronizer.expectingMoreOps(receivedOpHashes);
325342
}
326343

344+
getSyncEventSource(): EventRelay<HashedObject> {
345+
346+
if (this.syncEventSource === undefined) {
347+
this.syncEventSource = this.createSyncEventSource();
348+
}
349+
350+
return this.syncEventSource;
351+
}
352+
353+
createSyncEventSource(): EventRelay<HashedObject> {
354+
return new EventRelay(this.mutableObj as HashedObject, new Map());
355+
}
356+
357+
getSyncState(): SyncState {
358+
359+
const gossipAgent = this.pod?.getAgent(StateGossipAgent.agentIdForGossipId(this.getAgentId())) as StateGossipAgent;
360+
361+
const remoteStateHashes = gossipAgent.getAllRemoteStateForAgent(this.getAgentId());
362+
const localStateHash = gossipAgent.getLocalStateForAgent(this.getAgentId());
363+
364+
let inSync = true;
365+
for (const remoteHash of Object.values(remoteStateHashes)) {
366+
if (remoteHash !== localStateHash) {
367+
inSync = false;
368+
break;
369+
}
370+
}
371+
372+
const opsToFetch = this.synchronizer.discoveredHistory.contents.size;
373+
374+
return {remoteStateHashes: remoteStateHashes, localStateHash: localStateHash, inSync: inSync, opsToFetch: opsToFetch, synchronizing: true}
375+
}
376+
377+
getMutableObject(): MutableObject {
378+
return this.mutableObj;
379+
}
380+
381+
getPeerGroupId(): string {
382+
return this.peerGroupAgent.peerGroupId;
383+
}
327384
}
328385

329386
export { SyncMsg as HistoryMsg, HeaderBasedSyncAgent, StateFilter }

src/mesh/agents/state/StateGossipAgent.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ enum GossipType {
7070

7171
interface SendFullState {
7272
type: GossipType.SendFullState,
73-
state: {entries: [AgentId, Hash][], hashes: Hash[]} //HashedMap<AgentId, Hash>.toArrays
73+
state: {entries: [AgentId, Hash][], hashes: Hash[]}
7474
};
7575

7676
interface SendStateObject {
@@ -121,7 +121,7 @@ enum SendingReason {
121121

122122
class StateGossipAgent extends PeeringAgentBase {
123123

124-
static agentIdForGossip(gossipId: string) {
124+
static agentIdForGossipId(gossipId: string) {
125125
return 'state-gossip-agent-for-' + gossipId;
126126
}
127127

@@ -164,9 +164,9 @@ class StateGossipAgent extends PeeringAgentBase {
164164

165165

166166

167-
constructor(topic: string, peerNetwork: PeerGroupAgent) {
167+
constructor(gossipId: string, peerNetwork: PeerGroupAgent) {
168168
super(peerNetwork);
169-
this.gossipId = topic;
169+
this.gossipId = gossipId;
170170

171171
this.trackedAgentIds = new Set();
172172
this.localState = new Map();
@@ -211,7 +211,7 @@ class StateGossipAgent extends PeeringAgentBase {
211211
}
212212

213213
getAgentId(): string {
214-
return StateGossipAgent.agentIdForGossip(this.gossipId);
214+
return StateGossipAgent.agentIdForGossipId(this.gossipId);
215215
}
216216

217217
getNetwork() : AgentPod {
@@ -471,7 +471,12 @@ class StateGossipAgent extends PeeringAgentBase {
471471

472472
if (gossip.type === GossipType.RequestStateObject) {
473473
this.controlLog.trace('Recevied state request for ' + gossip.agentId);
474-
this.sendStateObject(source, gossip.agentId, SendingReason.Request);
474+
if (this.localStateObjects.get(gossip.agentId)) {
475+
this.sendStateObject(source, gossip.agentId, SendingReason.Request);
476+
} else {
477+
this.controlLog.debug('Recevied state request for ' + gossip.agentId + ', but no state was found: not sending')
478+
}
479+
475480
}
476481
}
477482

@@ -557,7 +562,7 @@ class StateGossipAgent extends PeeringAgentBase {
557562

558563
}
559564

560-
if (receivedOldState && this.localState.get(agentId) !== state) {
565+
if (receivedOldState && this.localState.get(agentId) !== state && this.localStateObjects.get(agentId) !== undefined) {
561566
this.peerMessageLog.trace('Received old state for ' + agentId + ' from ' + sender + ', sending our own state over there.');
562567
this.sendStateObject(sender, agentId);
563568
}
@@ -680,6 +685,26 @@ class StateGossipAgent extends PeeringAgentBase {
680685

681686
}
682687

688+
// for other agents
689+
690+
getAllRemoteStateForAgent(agentId: AgentId): {[key: Endpoint]: Hash} {
691+
692+
let state: {[key: Endpoint]: Hash} = {};
693+
694+
for (const [endpoint, peerState] of this.remoteState.entries()) {
695+
const stateHash = peerState.get(agentId);
696+
697+
if (stateHash !== undefined) {
698+
state[endpoint] = stateHash;
699+
}
700+
}
701+
702+
return state;
703+
}
704+
705+
getLocalStateForAgent(agentId: AgentId): Hash|undefined {
706+
return this.localState.get(agentId);
707+
}
683708

684709
}
685710

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1-
import { Hash, HashedObject } from 'data/model';
1+
import { Hash, HashedObject, MutableObject } from 'data/model';
2+
import { EventRelay } from 'util/events';
23
import { Agent } from '../../service/Agent';
34
import { Endpoint } from '../network/NetworkAgent';
5+
import { SyncState } from './SyncObserverAgent';
46

57
interface StateSyncAgent extends Agent {
68

79
receiveRemoteState(sender: Endpoint, stateHash: Hash, state: HashedObject) : Promise<boolean>;
810
expectingMoreOps(receivedOpHashes?: Set<Hash>): boolean;
911

12+
getMutableObject(): MutableObject;
13+
getPeerGroupId(): string;
14+
15+
getSyncState(): SyncState;
16+
getSyncEventSource(): EventRelay<HashedObject>;
1017
}
1118

1219
export { StateSyncAgent }

src/mesh/agents/state/SyncObserver.ts

Whitespace-only changes.

src/mesh/agents/state/SyncState.ts

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)