Skip to content

Commit 8fecd82

Browse files
committed
fixes to sync observability - WIP
1 parent 5dea7f9 commit 8fecd82

File tree

4 files changed

+88
-9
lines changed

4 files changed

+88
-9
lines changed

src/mesh/service/Mesh.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn/ObjectSpawnAg
1818
import { ObjectInvokeAgent } from 'mesh/agents/spawn/ObjectInvokeAgent';
1919
import { PeerGroupState } from 'mesh/agents/peer/PeerGroupState';
2020
import { Resources } from 'spaces/Resources';
21+
import { MeshInterface } from './remoting/MeshInterface';
2122

2223

2324
/* Connect to the Hyper Hyper Space service mesh.
@@ -106,7 +107,7 @@ class CannotInferPeerGroup extends Error {
106107
}
107108
}
108109

109-
class Mesh {
110+
class Mesh implements MeshInterface {
110111

111112
static syncCommandsLog = new Logger('mesh-sync-commands', LogLevel.INFO);
112113

@@ -257,7 +258,7 @@ class Mesh {
257258
return this.registerUsage({type: 'object-sync', objHash: obj.getLastHash(), peerGroupId: peerGroupId}, usageToken);
258259
}
259260

260-
syncManyObjectsWithPeerGroup(peerGroupId: string, objs: IterableIterator<HashedObject>, mode:SyncMode=SyncMode.full, usageTokens?: Map<Hash, UsageToken>): Map<Hash, UsageToken>{
261+
syncManyObjectsWithPeerGroup(peerGroupId: string, objs: IterableIterator<HashedObject>, mode:SyncMode=SyncMode.full, usageTokens?: Map<Hash, UsageToken>): Map<Hash, UsageToken> {
261262

262263
const tokens = new Map<Hash, UsageToken>();
263264

@@ -457,9 +458,6 @@ class Mesh {
457458
objectInvokeAgent.sendRequest(object, receiver, receiverLinkupServers, senderEndpoint);
458459
}
459460

460-
461-
462-
463461
getSyncAgentFor(peerGroupId: PeerGroupId, mutHash: Hash): StateSyncAgent|undefined {
464462
return this.syncAgents.get(peerGroupId)?.get(mutHash);
465463
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { Identity } from 'data/identity';
2+
import { Hash, HashedObject, MutableObject } from 'data/model';
3+
import { LinkupAddress } from 'net/linkup';
4+
import { ObjectDiscoveryReply } from '../../agents/discovery';
5+
import { SpawnCallback } from '../../agents/spawn';
6+
7+
import { AsyncStream } from 'util/streams';
8+
import { Endpoint } from '../../agents/network';
9+
import { PeerGroupAgentConfig, PeerGroupState } from '../../agents/peer';
10+
import { SyncObserver, SyncState } from '../../agents/state';
11+
import { PeerGroupInfo, SyncMode, UsageToken } from '../Mesh';
12+
13+
type PeerGroupId = string;
14+
15+
interface MeshInterface {
16+
17+
joinPeerGroup(pg: PeerGroupInfo, config?: PeerGroupAgentConfig, usageToken?: UsageToken): UsageToken;
18+
leavePeerGroup(token: UsageToken): void;
19+
20+
getPeerGroupState(peerGroupId: PeerGroupId): Promise<PeerGroupState|undefined>;
21+
22+
syncObjectWithPeerGroup(peerGroupId: PeerGroupId, obj: HashedObject, mode?:SyncMode, usageToken?: UsageToken): UsageToken;
23+
syncManyObjectsWithPeerGroup(peerGroupId: PeerGroupId, objs: IterableIterator<HashedObject>, mode?:SyncMode, usageTokens?: Map<Hash, UsageToken>): Map<Hash, UsageToken>;
24+
stopSyncObjectWithPeerGroup(usageToken: UsageToken): void;
25+
stopSyncManyObjectsWithPeerGroup(tokens: IterableIterator<UsageToken>): void;
26+
27+
getSyncState(mut: MutableObject, peerGroupId?: string): Promise<SyncState|undefined>;
28+
addSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): void;
29+
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: PeerGroupId): void;
30+
31+
startObjectBroadcast(object: HashedObject, linkupServers: string[], replyEndpoints: Endpoint[], broadcastedSuffixBits?: number, usageToken?: UsageToken): UsageToken;
32+
stopObjectBroadcast(token: UsageToken): void;
33+
34+
findObjectByHash(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count?: number, maxAge?: number, strictEndpoints?: boolean, includeErrors?: boolean) : AsyncStream<ObjectDiscoveryReply>;
35+
findObjectByHashSuffix(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count?: number, maxAge?: number, strictEndpoints?: boolean, includeErrors?: boolean) : AsyncStream<ObjectDiscoveryReply>;
36+
findObjectByHashRetry(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count?: number): void;
37+
findObjectByHashSuffixRetry(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count?: number): void;
38+
39+
addObjectSpawnCallback(callback: SpawnCallback, receiver: Identity, linkupServers: Array<string>, spawnId?: string): void;
40+
sendObjectSpawnRequest(object: HashedObject, sender: Identity, receiver: Identity, senderEndpoint: Endpoint, receiverLinkupServers: Array<string>, spawnId?: string): void;
41+
42+
shutdown(): void;
43+
44+
}
45+
46+
export { MeshInterface }

src/mesh/service/remoting/MeshProxy.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { PeerGroupAgentConfig, PeerInfo, PeerSource } from 'mesh/agents/peer';
1+
import { ObjectDiscoveryPeerSource, PeerGroupAgentConfig, PeerInfo, PeerSource } from 'mesh/agents/peer';
22
import { Mesh, PeerGroupInfo, SyncMode, UsageToken } from 'mesh/service/Mesh';
33
import { MeshCommand,
44
JoinPeerGroup, LeavePeerGroup,
@@ -14,7 +14,7 @@ import { MeshCommand,
1414
ForwardPeerGroupState} from './MeshHost';
1515

1616
import { RNGImpl } from 'crypto/random';
17-
import { Context, Hash, HashedObject } from 'data/model';
17+
import { Context, Hash, HashedObject, MutableObject } from 'data/model';
1818
import { AsyncStream, BufferedAsyncStream, BufferingAsyncStreamSource } from 'util/streams';
1919
import { ObjectDiscoveryReply } from 'mesh/agents/discovery';
2020
import { Endpoint } from 'mesh/agents/network';
@@ -23,12 +23,15 @@ import { WebRTCConnectionEvent, WebRTCConnectionsHost } from 'net/transport';
2323
import { Identity } from 'data/identity';
2424
import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn';
2525
import { PeerGroupState } from 'mesh/agents/peer/PeerGroupState';
26+
import { Resources } from 'spaces/Resources';
27+
import { MeshInterface } from './MeshInterface';
28+
import { SyncState, SyncObserver } from 'mesh/agents/state';
2629

2730
/* Access a mesh remotely, see the MeshHost class. */
2831

2932
type RequestId = string;
3033

31-
class MeshProxy {
34+
class MeshProxy implements MeshInterface {
3235

3336
commandForwardingFn: (cmd: MeshCommand) => void;
3437
discoveryStreamSources: Map<string, BufferingAsyncStreamSource<ObjectDiscoveryReply>>;
@@ -497,6 +500,38 @@ class MeshProxy {
497500

498501
}
499502

503+
// We do not need to bridge this request to the MeshHost: the ObjectDiscoveryPeerSource receives a reference
504+
// to this mesh, that's already bridged.
505+
async getDiscoveryPeerGroup(obj: HashedObject, resources?: Resources) : Promise<PeerGroupInfo> {
506+
507+
resources = resources || obj.getResources();
508+
509+
if (resources === undefined) {
510+
throw new Error('Could not find a valid resources object to use for the discovery peer group.');
511+
}
512+
513+
let localPeer = resources.getPeersForDiscovery()[0];
514+
let peerSource = new ObjectDiscoveryPeerSource(this as any as Mesh, obj, resources.config.linkupServers, LinkupAddress.fromURL(localPeer.endpoint, localPeer.identity), resources.getEndointParserForDiscovery());
515+
516+
return {
517+
id: Mesh.discoveryPeerGroupId(obj),
518+
localPeer: localPeer,
519+
peerSource: peerSource
520+
};
521+
522+
}
523+
524+
getSyncState(_mut: MutableObject, _peerGroupId?: string | undefined): Promise<SyncState | undefined> {
525+
throw new Error('Method not implemented.');
526+
}
527+
528+
addSyncObserver(_obs: SyncObserver, _mut: MutableObject, _peerGroupId?: string | undefined): void {
529+
throw new Error('Method not implemented.');
530+
}
531+
532+
removeSyncObserver(_obs: SyncObserver, _mut: MutableObject, _peerGroupId?: string | undefined): void {
533+
throw new Error('Method not implemented.');
534+
}
500535

501536
}
502537

src/mesh/service/webworker/WebWorkerMeshProxy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class WebWorkerMeshProxy {
137137
}
138138

139139
getMesh(): Mesh {
140-
return this.proxy as any as Mesh;
140+
return this.proxy as unknown as Mesh;
141141
}
142142

143143
}

0 commit comments

Comments
 (0)