Skip to content

Commit 5dea7f9

Browse files
committed
fixes to sync state event system
1 parent 73f4c7d commit 5dea7f9

File tree

6 files changed

+139
-73
lines changed

6 files changed

+139
-73
lines changed

src/mesh/agents/state/HeaderBasedSyncAgent.ts

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
5757
synchronizer : HistorySynchronizer;
5858
provider : HistoryProvider;
5959

60-
syncEventSource?: EventRelay<HashedObject>;
60+
syncEventSource?: EventRelay<HashedObject, SyncState>;
6161

6262
terminated = false;
6363

@@ -104,8 +104,11 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
104104
}
105105
}
106106
);
107+
107108
this.watchStoreForOps();
108109

110+
this.emitSyncState();
111+
109112
this.controlLog.debug('Started agent for ' + this.mutableObjHash);
110113
}
111114

@@ -114,7 +117,7 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
114117
const ev: SyncStateUpdateEvent = {
115118
emitter: this.mutableObj,
116119
action: SyncObserverEventTypes.SyncStateUpdate,
117-
data: {inSync: false, synchronizing: false, opsToFetch: 0, remoteStateHashes: {}}
120+
data: {allPeersInSync: false, synchronizing: false, opsToFetch: 0, remoteStateHashes: {}}
118121
};
119122

120123
this.syncEventSource?.emit(ev);
@@ -157,6 +160,9 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
157160
this.synchronizer.onNewHistory(sender, unknown);
158161
}
159162

163+
this.emitSyncState();
164+
165+
160166
}
161167

162168
}
@@ -214,15 +220,7 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
214220
await this.updateStateFromStore();
215221
}
216222

217-
if (this.syncEventSource !== undefined) {
218-
const ev: SyncStateUpdateEvent = {
219-
emitter: this.mutableObj,
220-
action: SyncObserverEventTypes.SyncStateUpdate,
221-
data: this.getSyncState()
222-
};
223-
224-
this.syncEventSource.emit(ev);
225-
}
223+
this.emitSyncState();
226224
};
227225

228226
literalIsValidOp(literal?: Literal, log=false): boolean {
@@ -354,9 +352,27 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
354352
return new EventRelay(this.mutableObj as HashedObject, new Map());
355353
}
356354

355+
// by default use our own syncEventSource, but allow the caller to specify another event relay
356+
// (used by the SyncObserverAgent to send the initial state)
357+
emitSyncState() {
358+
359+
if (this.syncEventSource !== undefined) {
360+
const ev: SyncStateUpdateEvent = {
361+
emitter: this.mutableObj,
362+
action: SyncObserverEventTypes.SyncStateUpdate,
363+
data: this.getSyncState()
364+
};
365+
366+
console.log('new sync state event:', ev);
367+
this.syncEventSource.emit(ev);
368+
} else {
369+
console.log('not emitting new sync state event: no relay')
370+
}
371+
}
372+
357373
getSyncState(): SyncState {
358374

359-
const gossipAgent = this.pod?.getAgent(StateGossipAgent.agentIdForGossipId(this.getAgentId())) as StateGossipAgent;
375+
const gossipAgent = this.pod?.getAgent(StateGossipAgent.agentIdForGossipId(this.peerGroupAgent.peerGroupId)) as StateGossipAgent;
360376

361377
const remoteStateHashes = gossipAgent.getAllRemoteStateForAgent(this.getAgentId());
362378
const localStateHash = gossipAgent.getLocalStateForAgent(this.getAgentId());
@@ -371,7 +387,7 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
371387

372388
const opsToFetch = this.synchronizer.discoveredHistory.contents.size;
373389

374-
return {remoteStateHashes: remoteStateHashes, localStateHash: localStateHash, inSync: inSync, opsToFetch: opsToFetch, synchronizing: true}
390+
return {remoteStateHashes: remoteStateHashes, localStateHash: localStateHash, allPeersInSync: inSync, opsToFetch: opsToFetch, synchronizing: true}
375391
}
376392

377393
getMutableObject(): MutableObject {

src/mesh/agents/state/StateGossipAgent.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,9 +696,11 @@ class StateGossipAgent extends PeeringAgentBase {
696696

697697
if (stateHash !== undefined) {
698698
state[endpoint] = stateHash;
699+
console.log('ep=', endpoint, ', hash=', stateHash);
699700
}
700701
}
701702

703+
console.log(state);
702704
return state;
703705
}
704706

src/mesh/agents/state/SyncObserverAgent.ts

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import { StateSyncAgent } from './StateSyncAgent';
2424
* - addSyncObserver() is called -> ... to be continued
2525
*/
2626

27-
type SyncObserver = Observer<HashedObject>;
27+
type SyncState = { remoteStateHashes: {[key: Endpoint]: Hash}, localStateHash?: Hash, allPeersInSync: boolean, opsToFetch: number, synchronizing: boolean };
28+
29+
type SyncObserver = Observer<HashedObject, SyncState>;
2830
type SyncEvent = Event<HashedObject, SyncState>;
2931

30-
type SyncState = { remoteStateHashes: {[key: Endpoint]: Hash}, localStateHash?: Hash, inSync: boolean, opsToFetch: number, synchronizing: boolean }
32+
3133

3234
enum SyncObserverEventTypes {
3335
SyncStateUpdate = 'sync-state-update'
@@ -43,7 +45,7 @@ class SyncObserverAgent implements Agent {
4345

4446
pod?: AgentPod;
4547

46-
relays: Map<AgentId, EventRelay<HashedObject>>;
48+
relays: Map<AgentId, [EventRelay<HashedObject>, SyncObserver]>;
4749

4850
constructor() {
4951
this.relays = new Map();
@@ -59,27 +61,44 @@ class SyncObserverAgent implements Agent {
5961

6062
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId: PeerGroupId) {
6163

64+
console.log('adding sync observer ', obs, ' for ', mut.getLastHash(), ' in peer group ', peerGroupId)
65+
6266
if (this.pod === undefined) {
6367
throw new Error('Trying to add a sync observer, but the SyncObserverAgent is not ready.');
6468
}
6569

6670
const syncAgentId = mut.getSyncAgentId(peerGroupId);
71+
const syncAgent = this.pod.getAgent(syncAgentId) as StateSyncAgent|undefined;
6772

68-
let relay = this.relays.get(syncAgentId);
73+
let pair = this.relays.get(syncAgentId);
74+
6975

70-
if (relay === undefined) {
71-
const upstream = new Map<string, EventRelay<HashedObject>>();
76+
if (pair === undefined) {
7277

73-
const syncAgent = this.pod.getAgent(syncAgentId) as StateSyncAgent;
74-
if (syncAgent !== undefined) {
75-
upstream.set('agent', syncAgent.getSyncEventSource());
78+
const relay = new EventRelay<HashedObject>(mut);
79+
const syncAgentObs = (ev: SyncEvent) => {
80+
relay.emit(ev);
7681
}
7782

78-
relay = new EventRelay<HashedObject>(mut, upstream);
79-
this.relays.set(syncAgentId, relay);
83+
pair = [relay, syncAgentObs];
84+
85+
this.relays.set(syncAgentId, [relay, syncAgentObs]);
86+
87+
88+
if (syncAgent !== undefined) {
89+
syncAgent.getSyncEventSource().addObserver(syncAgentObs);
90+
}
8091
}
8192

82-
relay.addObserver(obs);
93+
pair[0].addObserver(obs);
94+
95+
if (syncAgent !== undefined) {
96+
obs({
97+
emitter: mut,
98+
action: SyncObserverEventTypes.SyncStateUpdate,
99+
data: syncAgent.getSyncState()
100+
});
101+
}
83102
}
84103

85104
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId: PeerGroupId) {
@@ -88,13 +107,20 @@ class SyncObserverAgent implements Agent {
88107
}
89108

90109
const syncAgentId = mut.getSyncAgentId(peerGroupId);
91-
let relay = this.relays.get(syncAgentId);
110+
const pair = this.relays.get(syncAgentId);
92111

93-
if (relay !== undefined) {
112+
if (pair !== undefined) {
113+
const [relay, syncAgentObs] = pair;
94114
relay.removeObserver(obs);
95115

96116
if (relay.observers.size === 0) {
97-
relay.removeAllUpstreamRelays();
117+
118+
const syncAgent = this.pod.getAgent(syncAgentId) as StateSyncAgent|undefined;
119+
120+
if (syncAgent !== undefined) {
121+
syncAgent.getSyncEventSource().removeObserver(syncAgentObs);
122+
}
123+
98124
this.relays.delete(syncAgentId);
99125
}
100126
}
@@ -106,14 +132,19 @@ class SyncObserverAgent implements Agent {
106132
const agentEv = ev as AgentSetChangeEvent;
107133

108134
const syncAgentId = agentEv.content.agentId;
109-
const relay = this.relays.get(syncAgentId);
135+
const pair = this.relays.get(syncAgentId);
110136

111-
if (relay !== undefined) {
137+
if (pair !== undefined) {
112138
if (agentEv.content.change === AgentSetChange.Addition) {
113139
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent;
114-
relay.addUpstreamRelay('agent', syncAgent.getSyncEventSource());
140+
const [_relay, syncAgentObserver] = pair;
141+
syncAgent.getSyncEventSource().addObserver(syncAgentObserver);
142+
console.log('+++ adding agent relay ', syncAgent.getSyncEventSource(), ' to ', _relay)
115143
} else if (agentEv.content.change === AgentSetChange.Removal) {
116-
relay.removeUpstreamRelay('agent');
144+
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent|undefined;
145+
const [_relay, syncAgentObserver] = pair;
146+
syncAgent?.getSyncEventSource().addObserver(syncAgentObserver);
147+
console.log('+++ removing agent relay from ', _relay)
117148
}
118149

119150
}

src/mesh/service/Mesh.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Store } from 'storage/store';
44
import { MultiMap } from 'util/multimap';
55

66
import { Endpoint, NetworkAgent, NetworkAgentProxyConfig, SecureNetworkAgent } from '../agents/network';
7-
import { PeerInfo, PeerSource, PeerGroupAgent, PeerGroupAgentConfig } from '../agents/peer';
7+
import { PeerInfo, PeerSource, PeerGroupAgent, PeerGroupAgentConfig, ObjectDiscoveryPeerSource } from '../agents/peer';
88
import { StateGossipAgent, StateSyncAgent, SyncObserver, SyncObserverAgent, SyncState } from 'mesh/agents/state';
99
import { ObjectBroadcastAgent, ObjectDiscoveryAgent, ObjectDiscoveryReply, ObjectDiscoveryReplyParams } from '../agents/discovery';
1010

@@ -17,6 +17,7 @@ import { Identity } from 'data/identity';
1717
import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn/ObjectSpawnAgent';
1818
import { ObjectInvokeAgent } from 'mesh/agents/spawn/ObjectInvokeAgent';
1919
import { PeerGroupState } from 'mesh/agents/peer/PeerGroupState';
20+
import { Resources } from 'spaces/Resources';
2021

2122

2223
/* Connect to the Hyper Hyper Space service mesh.
@@ -101,7 +102,7 @@ enum SyncMode {
101102

102103
class CannotInferPeerGroup extends Error {
103104
constructor(mutHash: Hash) {
104-
super('The sync state for ' + mutHash + ' was requested, but no peerGroupId was specified, and there are more than one peer groups synchronizing this object!');
105+
super('The sync state for ' + mutHash + ' was requested, but no peerGroupId was specified, and there is not exactly one peer group synchronizing this object, so we cannot infer which one was intended!');
105106
}
106107
}
107108

@@ -332,6 +333,8 @@ class Mesh {
332333
peerGroupId = this.inferPeerGroupId(mut);
333334
}
334335

336+
console.log('mesh: observer to ', mut.getLastHash(), ' peerGroupId=', peerGroupId);
337+
335338
this.syncObserver.addSyncObserver(obs, mut, peerGroupId);
336339
}
337340

@@ -843,14 +846,38 @@ class Mesh {
843846

844847
}
845848

849+
async getDiscoveryPeerGroup(obj: HashedObject, resources?: Resources) : Promise<PeerGroupInfo> {
850+
851+
resources = resources || obj.getResources();
852+
853+
if (resources === undefined) {
854+
throw new Error('Could not find a valid resources object to use for the discovery peer group.');
855+
}
856+
857+
let localPeer = resources.getPeersForDiscovery()[0];
858+
let peerSource = new ObjectDiscoveryPeerSource(this, obj, resources.config.linkupServers, LinkupAddress.fromURL(localPeer.endpoint, localPeer.identity), resources.getEndointParserForDiscovery());
859+
860+
return {
861+
id: Mesh.discoveryPeerGroupId(obj),
862+
localPeer: localPeer,
863+
peerSource: peerSource
864+
};
865+
866+
}
867+
868+
static discoveryPeerGroupId(obj: HashedObject) {
869+
return 'sync-for-' + obj.getLastHash();
870+
}
871+
846872
private inferPeerGroupId(mut: MutableObject) {
847873
const mutHash = mut.getLastHash();
848874

849875
const agents = this.gossipIdsPerObject.get(mutHash);
850876

851-
if (agents.size > 0) {
877+
if (agents.size !== 1) {
852878
throw new CannotInferPeerGroup(mutHash);
853879
} else {
880+
console.log('agents ', agents)
854881
return agents.values().next().value as string;
855882
}
856883
}

src/mesh/service/MeshNode.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
import { Identity } from 'data/identity';
33
import { Hash, HashedObject } from 'data/model';
44
import { Endpoint } from 'mesh/agents/network';
5-
import { ObjectDiscoveryPeerSource, PeerGroupState, PeerInfo } from 'mesh/agents/peer';
5+
import { PeerGroupState, PeerInfo } from 'mesh/agents/peer';
66
import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn';
77
import { LinkupAddress, LinkupManager } from 'net/linkup';
88
import { Resources } from 'spaces/spaces';
99
import { MultiMap } from 'util/multimap';
10-
import { PeerGroupInfo, SyncMode, UsageToken } from './Mesh';
10+
import { Mesh, PeerGroupInfo, SyncMode, UsageToken } from './Mesh';
1111

1212
type Key = string;
1313

@@ -64,15 +64,15 @@ class MeshNode {
6464
}
6565

6666
async getPeerGroupStateForSyncObj(obj: HashedObject) {
67-
const peerGroup = await this.discoveryPeerGroupInfo(obj);
67+
const peerGroup = await this.getDiscoveryPeerGroup(obj);
6868

6969
return this.getPeerGroupState(peerGroup.id);
7070
}
7171

7272
async sync(obj: HashedObject, mode :SyncMode = SyncMode.full, peerGroup?: PeerGroupInfo): Promise<void> {
7373

7474
if (peerGroup === undefined) {
75-
peerGroup = await this.discoveryPeerGroupInfo(obj);
75+
peerGroup = await this.getDiscoveryPeerGroup(obj);
7676
}
7777

7878
const peerGroupKey = MeshNode.generateKey([peerGroup.id]);
@@ -97,7 +97,7 @@ class MeshNode {
9797

9898
async stopSync(obj: HashedObject, peerGroupId?: string, gossipId?: string) : Promise<void> {
9999
if (peerGroupId === undefined) {
100-
peerGroupId = MeshNode.discoveryPeerGroupInfoId(obj);
100+
peerGroupId = Mesh.discoveryPeerGroupId(obj);
101101
}
102102

103103
const syncKey = MeshNode.generateKey([obj.hash(), peerGroupId, gossipId]);
@@ -129,20 +129,8 @@ class MeshNode {
129129
this.resources.mesh.sendObjectSpawnRequest(object, sender, receiver, senderEndpoint, receiverLinkupServers, spawnId)
130130
}
131131

132-
private async discoveryPeerGroupInfo(obj: HashedObject) : Promise<PeerGroupInfo> {
133-
let localPeer = this.resources.getPeersForDiscovery()[0];
134-
let peerSource = new ObjectDiscoveryPeerSource(this.resources.mesh, obj, this.resources.config.linkupServers, LinkupAddress.fromURL(localPeer.endpoint, localPeer.identity), this.resources.getEndointParserForDiscovery());
135-
136-
return {
137-
id: MeshNode.discoveryPeerGroupInfoId(obj),
138-
localPeer: localPeer,
139-
peerSource: peerSource
140-
};
141-
142-
}
143-
144-
private static discoveryPeerGroupInfoId(obj: HashedObject) {
145-
return 'sync-for-' + obj.hash();
132+
async getDiscoveryPeerGroup(obj: HashedObject) : Promise<PeerGroupInfo> {
133+
return this.resources.mesh.getDiscoveryPeerGroup(obj, this.resources);
146134
}
147135

148136
private static generateKey(parts: (string|undefined)[]): string {
@@ -164,7 +152,7 @@ class MeshNode {
164152
async expectingMoreOps(obj: HashedObject, receivedOps?: Set<Hash>, peerGroupId?: string, rootObject?: HashedObject): Promise<boolean> {
165153

166154
if (peerGroupId === undefined) {
167-
const peerGroup = await this.discoveryPeerGroupInfo(rootObject !== undefined? rootObject : obj);
155+
const peerGroup = await this.getDiscoveryPeerGroup(rootObject !== undefined? rootObject : obj);
168156
peerGroupId = peerGroup.id;
169157
}
170158

0 commit comments

Comments
 (0)