Skip to content

Commit 19647e5

Browse files
committed
bridged sync observing to/from WebWorker-based mesh.
1 parent 9583de4 commit 19647e5

File tree

8 files changed

+282
-40
lines changed

8 files changed

+282
-40
lines changed

src/mesh/agents/state/HeaderBasedSyncAgent.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,8 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
362362
action: SyncObserverEventTypes.SyncStateUpdate,
363363
data: this.getSyncState()
364364
};
365-
366-
console.log('new sync state event:', ev);
365+
367366
this.syncEventSource.emit(ev);
368-
} else {
369-
console.log('not emitting new sync state event: no relay')
370367
}
371368
}
372369

src/mesh/agents/state/StateGossipAgent.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,11 +696,9 @@ class StateGossipAgent extends PeeringAgentBase {
696696

697697
if (stateHash !== undefined) {
698698
state[endpoint] = stateHash;
699-
console.log('ep=', endpoint, ', hash=', stateHash);
700699
}
701700
}
702701

703-
console.log(state);
704702
return state;
705703
}
706704

src/mesh/agents/state/SyncObserverAgent.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ class SyncObserverAgent implements Agent {
6161

6262
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId: PeerGroupId) {
6363

64-
console.log('adding sync observer ', obs, ' for ', mut.getLastHash(), ' in peer group ', peerGroupId)
65-
6664
if (this.pod === undefined) {
6765
throw new Error('Trying to add a sync observer, but the SyncObserverAgent is not ready.');
6866
}
@@ -139,12 +137,10 @@ class SyncObserverAgent implements Agent {
139137
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent;
140138
const [_relay, syncAgentObserver] = pair;
141139
syncAgent.getSyncEventSource().addObserver(syncAgentObserver);
142-
console.log('+++ adding agent relay ', syncAgent.getSyncEventSource(), ' to ', _relay)
143140
} else if (agentEv.content.change === AgentSetChange.Removal) {
144141
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent|undefined;
145142
const [_relay, syncAgentObserver] = pair;
146143
syncAgent?.getSyncEventSource().addObserver(syncAgentObserver);
147-
console.log('+++ removing agent relay from ', _relay)
148144
}
149145

150146
}

src/mesh/service/Mesh.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,18 +328,16 @@ class Mesh implements MeshInterface {
328328
}
329329
}
330330

331-
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId) {
331+
async addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId) {
332332

333333
if (peerGroupId === undefined) {
334334
peerGroupId = this.inferPeerGroupId(mut);
335335
}
336336

337-
console.log('mesh: observer to ', mut.getLastHash(), ' peerGroupId=', peerGroupId);
338-
339337
this.syncObserver.addSyncObserver(obs, mut, peerGroupId);
340338
}
341339

342-
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId) {
340+
async removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId) {
343341

344342
if (peerGroupId === undefined) {
345343
peerGroupId = this.inferPeerGroupId(mut);
@@ -875,7 +873,6 @@ class Mesh implements MeshInterface {
875873
if (agents.size !== 1) {
876874
throw new CannotInferPeerGroup(mutHash);
877875
} else {
878-
console.log('agents ', agents)
879876
return agents.values().next().value as string;
880877
}
881878
}

src/mesh/service/remoting/MeshHost.ts

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
import { PeerGroupAgentConfig, PeerInfo, PeerSource } from '../../agents/peer';
2-
import { CannotInferPeerGroup, Mesh, SyncMode, UsageToken } from '../../service/Mesh';
3-
import { SpawnCallback } from '../../agents/spawn';
4-
import { PeerGroupState } from '../../agents/peer/PeerGroupState';
5-
import { ObjectDiscoveryReply } from '../../agents/discovery';
6-
import { Endpoint } from '../../agents/network';
7-
81
import { Identity, RSAKeyPair } from 'data/identity';
92
import { Context, HashedObject, LiteralContext, MutableObject } from 'data/model';
103
import { Hash } from 'data/model';
@@ -15,7 +8,14 @@ import { RNGImpl } from 'crypto/random';
158

169
import { Store } from 'storage/store';
1710
import { LinkupAddress } from 'net/linkup';
18-
import { SyncState } from 'mesh/agents/state';
11+
12+
import { PeerGroupAgentConfig, PeerInfo, PeerSource } from '../../agents/peer';
13+
import { CannotInferPeerGroup, Mesh, SyncMode, UsageToken } from '../../service/Mesh';
14+
import { SpawnCallback } from '../../agents/spawn';
15+
import { PeerGroupState } from '../../agents/peer/PeerGroupState';
16+
import { ObjectDiscoveryReply } from '../../agents/discovery';
17+
import { Endpoint } from '../../agents/network';
18+
import { SyncEvent, SyncObserver, SyncState } from '../../agents/state';
1919

2020
/* Run a mesh remotely, and access it through a MeshProxy */
2121

@@ -31,13 +31,15 @@ import { SyncState } from 'mesh/agents/state';
3131
*
3232
* Ah, the things we do for you, Hyper Hyper Space. */
3333

34+
type ObserverId = string;
35+
3436
type MeshCommand = JoinPeerGroup | LeavePeerGroup | ForwardPeerGroupState |
3537
SyncObjectsWithPeerGroup | StopSyncObjectsWithPeerGroup |
3638
StartObjectBroadcast | StopObjectBroadcast |
3739
FindObjectByHash | FindObjectByHashSuffix |
3840
ForwardGetPeersReply | ForwardGetPeerForEndpointReply |
3941
AddObjectSpawnCallback | SendObjectSpawnRequest |
40-
ForwardSyncState |
42+
ForwardSyncState | AddSyncObserver | RemoveSyncObserver |
4143
Shutdown;
4244

4345
type JoinPeerGroup = {
@@ -147,6 +149,20 @@ type ForwardSyncState = {
147149
peerGroupId?: string
148150
}
149151

152+
type AddSyncObserver = {
153+
type: 'add-sync-observer',
154+
observerId: ObserverId,
155+
mutLiteralContext: LiteralContext,
156+
peerGroupId?: string
157+
}
158+
159+
type RemoveSyncObserver = {
160+
type: 'remove-sync-observer',
161+
observerId: ObserverId,
162+
mutLiteralContext: LiteralContext,
163+
peerGroupId?: string
164+
}
165+
150166
type Shutdown = {
151167
type: 'shutdown'
152168
}
@@ -167,7 +183,8 @@ type ForwardGetPeerForEndpointReply = {
167183

168184
type PeerInfoContext = { endpoint: Endpoint, identityHash: Hash, identity?: LiteralContext };
169185

170-
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback | PeerGroupStateReply | SyncStateReply;
186+
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback | PeerGroupStateReply |
187+
SyncStateReply | AddSyncObserverReply | RemoveSyncObserverReply | SyncObserverEventReply;
171188

172189
type LiteralObjectDiscoveryReply = {
173190
type: 'object-discovery-reply'
@@ -210,6 +227,27 @@ type SyncStateReply = {
210227
errorType?: 'infer-peer-group'
211228
}
212229

230+
type AddSyncObserverReply = {
231+
type: 'add-sync-observer-reply',
232+
observerId: ObserverId,
233+
errorType?: 'infer-peer-group',
234+
error?: string
235+
}
236+
237+
type RemoveSyncObserverReply = {
238+
type: 'remove-sync-observer-reply',
239+
observerId: ObserverId,
240+
errorType?: 'infer-peer-group',
241+
error?: string
242+
}
243+
244+
type SyncObserverEventReply = {
245+
type: 'sync-observer-event-reply',
246+
observerId: ObserverId,
247+
action: string,
248+
data: SyncState
249+
}
250+
213251
type PeerSourceRequest = GetPeersRequest | GetPeerForEndpointRequest;
214252

215253
type GetPeersRequest = {
@@ -249,17 +287,23 @@ class MeshHost {
249287
type === 'forward-get-peer-for-endpoint-reply' ||
250288
type === 'add-object-spawn-callback' ||
251289
type === 'send-object-spawn-callback' ||
252-
type === 'forward-sync-state'
290+
type === 'forward-sync-state' ||
291+
type === 'add-sync-observer' ||
292+
type === 'remove-sync-observer'
253293
);
254294
}
255295

256296
static isStreamedReply(msg: any): boolean {
257297
const type = msg?.type;
258298

259299
return (type === 'object-discovery-reply' ||
260-
type === 'object-discovery-end' ||
261-
type === 'object-spawn-callback' ||
262-
type === 'peer-group-state-reply');
300+
type === 'object-discovery-end' ||
301+
type === 'object-spawn-callback' ||
302+
type === 'peer-group-state-reply' ||
303+
type === 'sync-state-reply' ||
304+
type === 'add-sync-observer-reply' ||
305+
type === 'remove-sync-observer-reply' ||
306+
type === 'sync-observer-event-reply');
263307
}
264308

265309
static isPeerSourceRequest(msg: any): boolean {
@@ -280,6 +324,8 @@ class MeshHost {
280324

281325
stores: Map<string, Map<string, Store>>;
282326

327+
syncObservers: Map<ObserverId, SyncObserver>;
328+
283329
constructor(mesh: Mesh, streamedReplyCb: (resp: CommandStreamedReply) => void, peerSourceReqCb: (req: PeerSourceRequest) => void) {
284330
this.mesh = mesh;
285331
this.streamedReplyCb = streamedReplyCb;
@@ -288,6 +334,7 @@ class MeshHost {
288334
this.pendingPeerForEndpointRequests = new Map();
289335
this.spawnCallbacks = new Map();
290336
this.stores = new Map();
337+
this.syncObservers = new Map();
291338
}
292339

293340
execute(command: MeshCommand) : void {
@@ -371,11 +418,72 @@ class MeshHost {
371418
reply.errorType = 'infer-peer-group';
372419
}
373420

374-
reply.error = reason;
421+
reply.error = reason.toString();
375422

376423
this.streamedReplyCb(reply);
377424
});
378-
425+
426+
} else if (command.type === 'add-sync-observer') {
427+
let obs: SyncObserver = (ev: SyncEvent) => {
428+
429+
const reply: SyncObserverEventReply = {
430+
type: 'sync-observer-event-reply',
431+
observerId: command.observerId,
432+
action: ev.action,
433+
data: ev.data
434+
}
435+
436+
this.streamedReplyCb(reply);
437+
}
438+
439+
const reply: AddSyncObserverReply = {
440+
type: 'add-sync-observer-reply',
441+
observerId: command.observerId
442+
}
443+
444+
try {
445+
let mut = HashedObject.fromLiteralContext(command.mutLiteralContext) as MutableObject;
446+
this.mesh.addSyncObserver(obs, mut, command.peerGroupId);
447+
} catch (e: any) {
448+
if (e instanceof CannotInferPeerGroup) {
449+
reply.errorType = 'infer-peer-group'
450+
}
451+
reply.error = e.toString();
452+
}
453+
454+
if (reply.error === undefined && reply.errorType === undefined) {
455+
this.syncObservers.set(command.observerId, obs);
456+
}
457+
458+
this.streamedReplyCb(reply);
459+
460+
} else if (command.type === 'remove-sync-observer') {
461+
462+
const reply: RemoveSyncObserverReply = {
463+
type: 'remove-sync-observer-reply',
464+
observerId: command.observerId
465+
}
466+
467+
try {
468+
const obs = this.syncObservers.get(command.observerId);
469+
470+
if (obs !== undefined) {
471+
let mut = HashedObject.fromLiteralContext(command.mutLiteralContext) as MutableObject;
472+
this.mesh.removeSyncObserver(obs, mut, command.peerGroupId);
473+
}
474+
} catch (e: any) {
475+
if (e instanceof CannotInferPeerGroup) {
476+
reply.errorType = 'infer-peer-group'
477+
}
478+
reply.error = e.toString();
479+
}
480+
481+
if (reply.error === undefined && reply.errorType === undefined) {
482+
this.syncObservers.delete(command.observerId);
483+
}
484+
485+
this.streamedReplyCb(reply);
486+
379487
} else if (command.type === 'sync-objects-with-peer-group') {
380488
const syncObjs = command as SyncObjectsWithPeerGroup;
381489

@@ -675,7 +783,7 @@ export { MeshHost, MeshCommand,
675783
SyncObjectsWithPeerGroup, StopSyncObjectsWithPeerGroup,
676784
StartObjectBroadcast, StopObjectBroadcast,
677785
FindObjectByHash, FindObjectByHashSuffix, AddObjectSpawnCallback, SendObjectSpawnRequest,
678-
ForwardSyncState,
786+
ForwardSyncState, AddSyncObserver, RemoveSyncObserver,
679787
Shutdown,
680788
CommandStreamedReply, LiteralObjectDiscoveryReply, DiscoveryEndReply, ObjectSpawnCallback, PeerGroupStateReply, LiteralPeerInfo,
681789
ForwardGetPeersReply, ForwardGetPeerForEndpointReply,

src/mesh/service/remoting/MeshInterface.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ interface MeshInterface {
2525
stopSyncManyObjectsWithPeerGroup(tokens: IterableIterator<UsageToken>): void;
2626

2727
getSyncState(mut: MutableObject, peerGroupId?: string): Promise<SyncState|undefined>;
28-
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): void;
29-
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): void;
28+
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): Promise<void>;
29+
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): Promise<void>;
3030

3131
startObjectBroadcast(object: HashedObject, linkupServers: string[], replyEndpoints: Endpoint[], broadcastedSuffixBits?: number, usageToken?: UsageToken): UsageToken;
3232
stopObjectBroadcast(token: UsageToken): void;

0 commit comments

Comments
 (0)