Skip to content

Commit 9583de4

Browse files
committed
bridged getSyncSate to the web worker! WIP
1 parent 8fecd82 commit 9583de4

File tree

3 files changed

+111
-13
lines changed

3 files changed

+111
-13
lines changed

src/mesh/service/Mesh.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,4 +882,4 @@ class Mesh implements MeshInterface {
882882

883883
}
884884

885-
export { Mesh, PeerGroupInfo, SyncMode, UsageToken }
885+
export { Mesh, PeerGroupInfo, SyncMode, UsageToken, CannotInferPeerGroup }

src/mesh/service/remoting/MeshHost.ts

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { PeerGroupAgentConfig, PeerInfo, PeerSource } from '../../agents/peer';
2-
import { Mesh, SyncMode, UsageToken } from '../../service/Mesh';
2+
import { CannotInferPeerGroup, Mesh, SyncMode, UsageToken } from '../../service/Mesh';
33
import { SpawnCallback } from '../../agents/spawn';
44
import { PeerGroupState } from '../../agents/peer/PeerGroupState';
55
import { ObjectDiscoveryReply } from '../../agents/discovery';
66
import { Endpoint } from '../../agents/network';
77

88
import { Identity, RSAKeyPair } from 'data/identity';
9-
import { Context, HashedObject, LiteralContext } from 'data/model';
9+
import { Context, HashedObject, LiteralContext, MutableObject } from 'data/model';
1010
import { Hash } from 'data/model';
1111

1212
import { AsyncStream } from 'util/streams';
@@ -15,6 +15,7 @@ import { RNGImpl } from 'crypto/random';
1515

1616
import { Store } from 'storage/store';
1717
import { LinkupAddress } from 'net/linkup';
18+
import { SyncState } from 'mesh/agents/state';
1819

1920
/* Run a mesh remotely, and access it through a MeshProxy */
2021

@@ -33,9 +34,11 @@ import { LinkupAddress } from 'net/linkup';
3334
type MeshCommand = JoinPeerGroup | LeavePeerGroup | ForwardPeerGroupState |
3435
SyncObjectsWithPeerGroup | StopSyncObjectsWithPeerGroup |
3536
StartObjectBroadcast | StopObjectBroadcast |
36-
FindObjectByHash | FindObjectByHashSuffix | Shutdown |
37+
FindObjectByHash | FindObjectByHashSuffix |
3738
ForwardGetPeersReply | ForwardGetPeerForEndpointReply |
38-
AddObjectSpawnCallback | SendObjectSpawnRequest;
39+
AddObjectSpawnCallback | SendObjectSpawnRequest |
40+
ForwardSyncState |
41+
Shutdown;
3942

4043
type JoinPeerGroup = {
4144
type: 'join-peer-group';
@@ -137,6 +140,13 @@ type SendObjectSpawnRequest = {
137140
spawnId: string
138141
}
139142

143+
type ForwardSyncState = {
144+
type: 'forward-sync-state',
145+
requestId: string,
146+
mutLiteralContext: LiteralContext,
147+
peerGroupId?: string
148+
}
149+
140150
type Shutdown = {
141151
type: 'shutdown'
142152
}
@@ -157,7 +167,7 @@ type ForwardGetPeerForEndpointReply = {
157167

158168
type PeerInfoContext = { endpoint: Endpoint, identityHash: Hash, identity?: LiteralContext };
159169

160-
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback | PeerGroupStateReply;
170+
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback | PeerGroupStateReply | SyncStateReply;
161171

162172
type LiteralObjectDiscoveryReply = {
163173
type: 'object-discovery-reply'
@@ -192,6 +202,13 @@ type PeerGroupStateReply = {
192202
remote?: Array<LiteralPeerInfo>
193203
}
194204

205+
type SyncStateReply = {
206+
type: 'sync-state-reply',
207+
requestId: string,
208+
state?: SyncState
209+
error?: string,
210+
errorType?: 'infer-peer-group'
211+
}
195212

196213
type PeerSourceRequest = GetPeersRequest | GetPeerForEndpointRequest;
197214

@@ -231,7 +248,8 @@ class MeshHost {
231248
type === 'forward-get-peers-reply' ||
232249
type === 'forward-get-peer-for-endpoint-reply' ||
233250
type === 'add-object-spawn-callback' ||
234-
type === 'send-object-spawn-callback'
251+
type === 'send-object-spawn-callback' ||
252+
type === 'forward-sync-state'
235253
);
236254
}
237255

@@ -330,8 +348,34 @@ class MeshHost {
330348

331349
this.streamedReplyCb(reply);
332350

333-
})
351+
});
352+
353+
} else if (command.type === 'forward-sync-state') {
354+
355+
const mut = HashedObject.fromLiteralContext(command.mutLiteralContext) as MutableObject;
356+
357+
const reply: SyncStateReply = {
358+
type: 'sync-state-reply',
359+
requestId: command.requestId,
360+
}
361+
362+
this.mesh.getSyncState(mut, command.peerGroupId)
363+
.then((state: SyncState|undefined) => {
364+
365+
reply.state = state;
366+
367+
this.streamedReplyCb(reply);
368+
369+
}).catch((reason: any) => {
370+
if (reason instanceof CannotInferPeerGroup) {
371+
reply.errorType = 'infer-peer-group';
372+
}
373+
374+
reply.error = reason;
334375

376+
this.streamedReplyCb(reply);
377+
});
378+
335379
} else if (command.type === 'sync-objects-with-peer-group') {
336380
const syncObjs = command as SyncObjectsWithPeerGroup;
337381

@@ -631,6 +675,7 @@ export { MeshHost, MeshCommand,
631675
SyncObjectsWithPeerGroup, StopSyncObjectsWithPeerGroup,
632676
StartObjectBroadcast, StopObjectBroadcast,
633677
FindObjectByHash, FindObjectByHashSuffix, AddObjectSpawnCallback, SendObjectSpawnRequest,
678+
ForwardSyncState,
634679
Shutdown,
635680
CommandStreamedReply, LiteralObjectDiscoveryReply, DiscoveryEndReply, ObjectSpawnCallback, PeerGroupStateReply, LiteralPeerInfo,
636681
ForwardGetPeersReply, ForwardGetPeerForEndpointReply,

src/mesh/service/remoting/MeshProxy.ts

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ObjectDiscoveryPeerSource, PeerGroupAgentConfig, PeerInfo, PeerSource } from 'mesh/agents/peer';
2-
import { Mesh, PeerGroupInfo, SyncMode, UsageToken } from 'mesh/service/Mesh';
2+
import { CannotInferPeerGroup, Mesh, PeerGroupInfo, SyncMode, UsageToken } from 'mesh/service/Mesh';
33
import { MeshCommand,
44
JoinPeerGroup, LeavePeerGroup,
55
SyncObjectsWithPeerGroup, StopSyncObjectsWithPeerGroup,
@@ -11,7 +11,8 @@ import { MeshCommand,
1111
PeerInfoContext,
1212
AddObjectSpawnCallback,
1313
SendObjectSpawnRequest,
14-
ForwardPeerGroupState} from './MeshHost';
14+
ForwardPeerGroupState,
15+
ForwardSyncState} from './MeshHost';
1516

1617
import { RNGImpl } from 'crypto/random';
1718
import { Context, Hash, HashedObject, MutableObject } from 'data/model';
@@ -45,7 +46,7 @@ class MeshProxy implements MeshInterface {
4546
peerSourceRequestIngestFn: (req: PeerSourceRequest) => void;
4647

4748
pendingPeerGroupStates: Map<RequestId, {resolve: (result: PeerGroupState|undefined) => void, reject: (reason: any) => void, timeout: any}>;
48-
49+
pendingSyncStates: Map<RequestId, {resolve: (result: SyncState|undefined) => void, reject: (reason: any) => void, mut: MutableObject, timeout: any}>;
4950

5051

5152
constructor(meshCommandFwdFn: (cmd: MeshCommand) => void, linkupCommandFwdFn?: (cmd: LinkupManagerCommand) => void, webRTCConnEventIngestFn?: (ev: WebRTCConnectionEvent) => void) {
@@ -128,6 +129,33 @@ class MeshProxy implements MeshInterface {
128129
}
129130

130131
}
132+
} else if (reply.type === 'sync-state-reply') {
133+
const pending = this.pendingSyncStates.get(reply.requestId);
134+
const cb = pending?.resolve;
135+
const err = pending?.reject;
136+
const to = pending?.timeout;
137+
const mut = pending?.mut;
138+
139+
this.pendingSyncStates.delete(reply.requestId);
140+
141+
if (to !== undefined) {
142+
window.clearTimeout(to);
143+
}
144+
145+
if (reply.state !== undefined) {
146+
147+
if (cb !== undefined) {
148+
cb(reply.state);
149+
}
150+
} else {
151+
if (err !== undefined) {
152+
if (reply.errorType === 'infer-peer-group' && mut !== undefined) {
153+
err(new CannotInferPeerGroup(mut.getLastHash()));
154+
} else {
155+
err(reply.error);
156+
}
157+
}
158+
}
131159
}
132160
}
133161

@@ -195,6 +223,7 @@ class MeshProxy implements MeshInterface {
195223
};
196224

197225
this.pendingPeerGroupStates = new Map();
226+
this.pendingSyncStates = new Map();
198227
}
199228

200229
getCommandStreamedReplyIngestFn() {
@@ -521,8 +550,32 @@ class MeshProxy implements MeshInterface {
521550

522551
}
523552

524-
getSyncState(_mut: MutableObject, _peerGroupId?: string | undefined): Promise<SyncState | undefined> {
525-
throw new Error('Method not implemented.');
553+
getSyncState(mut: MutableObject, peerGroupId?: string | undefined, timeout=10000): Promise<SyncState | undefined> {
554+
const p = new Promise<SyncState|undefined>((resolve: (result: SyncState|undefined) => void, reject: (reason: any) => void) => {
555+
const requestId = new RNGImpl().randomHexString(128);
556+
557+
const cmd: ForwardSyncState = {
558+
type: 'forward-sync-state',
559+
requestId: requestId,
560+
peerGroupId: peerGroupId,
561+
mutLiteralContext: mut.toLiteralContext()
562+
};
563+
564+
565+
566+
const to = window.setTimeout(() => {
567+
if (this.pendingSyncStates.has(requestId)) {
568+
this.pendingSyncStates.delete(requestId)
569+
reject('timeout');
570+
}
571+
}, timeout);
572+
573+
this.pendingSyncStates.set(requestId, {resolve: resolve, reject: reject, timeout: to, mut: mut});
574+
575+
this.commandForwardingFn(cmd);
576+
});
577+
578+
return p;
526579
}
527580

528581
addSyncObserver(_obs: SyncObserver, _mut: MutableObject, _peerGroupId?: string | undefined): void {

0 commit comments

Comments
 (0)