Skip to content

Commit 9697571

Browse files
committed
observability for peer group & sync state - WIP
1 parent 15fd9e7 commit 9697571

File tree

6 files changed

+202
-10
lines changed

6 files changed

+202
-10
lines changed

src/mesh/agents/peer/PeerGroupAgent.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { Identity } from 'data/identity';
1717
import { Logger, LogLevel } from 'util/logging';
1818
import { Lock } from 'util/concurrency';
1919
import { LRUCache } from 'util/caching';
20+
import { PeerGroupState } from './PeerGroupState';
2021

2122
type PeerInfo = { endpoint: Endpoint, identityHash: Hash, identity?: Identity };
2223

@@ -549,6 +550,24 @@ class PeerGroupAgent implements Agent {
549550

550551
}
551552

553+
getState(): PeerGroupState {
554+
555+
const remote = new Map<Endpoint, PeerInfo>();
556+
557+
for (const ep of this.connectionsPerEndpoint.keys()) {
558+
const connId = this.findWorkingConnectionId(ep);
559+
const conn = connId === undefined? undefined : this.connections.get(connId);
560+
if (conn !== undefined) {
561+
remote.set(ep, conn.peer);
562+
}
563+
}
564+
565+
return {
566+
local: this.localPeer,
567+
remote: remote
568+
};
569+
}
570+
552571
shutdown() {
553572
if (this.tickTimerRef !== undefined) {
554573
clearInterval(this.tickTimerRef);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Endpoint } from '../network';
2+
import { PeerInfo } from './PeerGroupAgent';
3+
4+
5+
type PeerGroupState = {
6+
local: PeerInfo,
7+
remote: Map<Endpoint, PeerInfo>
8+
};
9+
10+
export type { PeerGroupState };

src/mesh/agents/state/SyncState.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { Hash } from 'data/model';
2+
3+
type SyncState = {
4+
objectHash: Hash,
5+
6+
fetchedEverything: boolean, // all remote known ops have been fetched
7+
sentEverything: boolean, // all local ops have been sent to all current peers
8+
synchronzing: boolean, // ops are being exchanged at the moment
9+
10+
opsToReceive: number // how many ops we know we need to fetch
11+
};
12+
13+
export type { SyncState };

src/mesh/service/Mesh.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { Logger, LogLevel } from 'util/logging';
1616
import { Identity } from 'data/identity';
1717
import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn/ObjectSpawnAgent';
1818
import { ObjectInvokeAgent } from 'mesh/agents/spawn/ObjectInvokeAgent';
19+
import { PeerGroupState } from 'mesh/agents/peer/PeerGroupState';
1920

2021

2122

@@ -207,6 +208,19 @@ class Mesh {
207208

208209
}
209210

211+
async getPeerGroupState(peerGroupId: string): Promise<PeerGroupState|undefined> {
212+
213+
const agentId = PeerGroupAgent.agentIdForPeerGroup(peerGroupId);
214+
215+
const agent = this.pod.getAgent(agentId) as PeerGroupAgent;
216+
217+
if (agent === undefined) {
218+
return undefined;
219+
} else {
220+
return agent.getState();
221+
}
222+
}
223+
210224
/*getConnectedPeers(peerGroupId: string): Array<PeerInfo> {
211225
let agent = this.pod.getAgent(PeerGroupAgent.agentIdForPeerGroup(peerGroupId)) as PeerGroupAgent;
212226

src/mesh/service/remoting/MeshHost.ts

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import { PeerGroupAgentConfig, PeerInfo, PeerSource } from '../../agents/peer';
22
import { Mesh, SyncMode, UsageToken } from '../../service/Mesh';
3+
import { SpawnCallback } from '../../agents/spawn';
4+
import { PeerGroupState } from '../../agents/peer/PeerGroupState';
5+
import { ObjectDiscoveryReply } from '../../agents/discovery';
6+
import { Endpoint } from '../../agents/network';
7+
8+
import { Identity, RSAKeyPair } from 'data/identity';
39
import { Context, HashedObject, LiteralContext } from 'data/model';
410
import { Hash } from 'data/model';
5-
import { Endpoint } from 'mesh/agents/network';
11+
612
import { AsyncStream } from 'util/streams';
7-
import { ObjectDiscoveryReply } from 'mesh/agents/discovery';
13+
814
import { RNGImpl } from 'crypto/random';
9-
import { Identity, RSAKeyPair } from 'data/identity';
15+
1016
import { Store } from 'storage/store';
1117
import { LinkupAddress } from 'net/linkup';
12-
import { SpawnCallback } from 'mesh/agents/spawn';
1318

1419
/* Run a mesh remotely, and access it through a MeshProxy */
1520

@@ -25,7 +30,7 @@ import { SpawnCallback } from 'mesh/agents/spawn';
2530
*
2631
* Ah, the things we do for you, Hyper Hyper Space. */
2732

28-
type MeshCommand = JoinPeerGroup | LeavePeerGroup |
33+
type MeshCommand = JoinPeerGroup | LeavePeerGroup | ForwardPeerGroupState |
2934
SyncObjectsWithPeerGroup | StopSyncObjectsWithPeerGroup |
3035
StartObjectBroadcast | StopObjectBroadcast |
3136
FindObjectByHash | FindObjectByHashSuffix | Shutdown |
@@ -50,6 +55,12 @@ type LeavePeerGroup = {
5055
usageToken: UsageToken
5156
}
5257

58+
type ForwardPeerGroupState = {
59+
type: 'forward-peer-group-state',
60+
requestId: string,
61+
peerGroupId: string
62+
}
63+
5364
type SyncObjectsWithPeerGroup = {
5465
type: 'sync-objects-with-peer-group',
5566
peerGroupId: string,
@@ -147,7 +158,7 @@ type ForwardGetPeerForEndpointReply = {
147158

148159
type PeerInfoContext = { endpoint: Endpoint, identityHash: Hash, identity?: LiteralContext };
149160

150-
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback;
161+
type CommandStreamedReply = LiteralObjectDiscoveryReply | DiscoveryEndReply | ObjectSpawnCallback | PeerGroupStateReply;
151162

152163
type LiteralObjectDiscoveryReply = {
153164
type: 'object-discovery-reply'
@@ -173,6 +184,15 @@ type ObjectSpawnCallback = {
173184
senderEndpoint: string
174185
}
175186

187+
type LiteralPeerInfo = { endpoint: Endpoint, identityHash: Hash, identity?: LiteralContext };
188+
189+
type PeerGroupStateReply = {
190+
type: 'peer-group-state-reply',
191+
requestId: string,
192+
local?: LiteralPeerInfo,
193+
remote?: Array<LiteralPeerInfo>
194+
}
195+
176196

177197
type PeerSourceRequest = GetPeersRequest | GetPeerForEndpointRequest;
178198

@@ -199,6 +219,7 @@ class MeshHost {
199219
return (type === 'join-peer-group' ||
200220
type === 'check-peer-group-usage' ||
201221
type === 'leave-peer-group' ||
222+
type === 'forward-peer-group-state' ||
202223
type === 'sync-objects-with-peer-group' ||
203224
type === 'stop-sync-objects-with-peer-group' ||
204225
type === 'start-object-broadcast' ||
@@ -220,7 +241,8 @@ class MeshHost {
220241

221242
return (type === 'object-discovery-reply' ||
222243
type === 'object-discovery-end' ||
223-
type === 'object-spawn-callback');
244+
type === 'object-spawn-callback' ||
245+
type === 'peer-group-state-reply');
224246
}
225247

226248
static isPeerSourceRequest(msg: any): boolean {
@@ -275,6 +297,42 @@ class MeshHost {
275297
} else if (command.type === 'leave-peer-group') {
276298
const leave = command as LeavePeerGroup;
277299
this.mesh.leavePeerGroup(leave.usageToken);
300+
301+
} else if (command.type === 'forward-peer-group-state') {
302+
303+
this.mesh.getPeerGroupState(command.peerGroupId).then((state: PeerGroupState|undefined) => {
304+
305+
const reply: PeerGroupStateReply = {
306+
type: 'peer-group-state-reply',
307+
requestId: command.requestId,
308+
}
309+
310+
if (state !== undefined) {
311+
const local: LiteralPeerInfo = {endpoint: state.local.endpoint, identityHash: state.local.identityHash};
312+
313+
if (state.local.identity !== undefined) {
314+
local.identity = state.local.identity.toLiteralContext();
315+
}
316+
317+
reply.local = local;
318+
319+
reply.remote = [];
320+
321+
for (const peerInfo of state.remote.values()) {
322+
const lit: LiteralPeerInfo = {endpoint: peerInfo.endpoint, identityHash: peerInfo.identityHash};
323+
324+
if (peerInfo.identity !== undefined) {
325+
lit.identity = peerInfo.identity.toLiteralContext();
326+
}
327+
328+
reply.remote.push(lit);
329+
}
330+
}
331+
332+
this.streamedReplyCb(reply);
333+
334+
})
335+
278336
} else if (command.type === 'sync-objects-with-peer-group') {
279337
const syncObjs = command as SyncObjectsWithPeerGroup;
280338

@@ -570,11 +628,11 @@ class PeerSourceProxy implements PeerSource {
570628
}
571629

572630
export { MeshHost, MeshCommand,
573-
JoinPeerGroup, LeavePeerGroup,
631+
JoinPeerGroup, LeavePeerGroup, ForwardPeerGroupState,
574632
SyncObjectsWithPeerGroup, StopSyncObjectsWithPeerGroup,
575633
StartObjectBroadcast, StopObjectBroadcast,
576634
FindObjectByHash, FindObjectByHashSuffix, AddObjectSpawnCallback, SendObjectSpawnRequest,
577635
Shutdown,
578-
CommandStreamedReply, LiteralObjectDiscoveryReply, DiscoveryEndReply, ObjectSpawnCallback,
636+
CommandStreamedReply, LiteralObjectDiscoveryReply, DiscoveryEndReply, ObjectSpawnCallback, PeerGroupStateReply, LiteralPeerInfo,
579637
ForwardGetPeersReply, ForwardGetPeerForEndpointReply,
580638
PeerSourceRequest, GetPeersRequest, GetPeerForEndpointRequest, PeerInfoContext };

src/mesh/service/remoting/MeshProxy.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import { MeshCommand,
1010
Shutdown,
1111
PeerInfoContext,
1212
AddObjectSpawnCallback,
13-
SendObjectSpawnRequest} from './MeshHost';
13+
SendObjectSpawnRequest,
14+
ForwardPeerGroupState} from './MeshHost';
1415

1516
import { RNGImpl } from 'crypto/random';
1617
import { Context, Hash, HashedObject } from 'data/model';
@@ -21,9 +22,12 @@ import { LinkupAddress, LinkupManager, LinkupManagerCommand, LinkupManagerProxy
2122
import { WebRTCConnectionEvent, WebRTCConnectionsHost } from 'net/transport';
2223
import { Identity } from 'data/identity';
2324
import { ObjectSpawnAgent, SpawnCallback } from 'mesh/agents/spawn';
25+
import { PeerGroupState } from 'mesh/agents/peer/PeerGroupState';
2426

2527
/* Access a mesh remotely, see the MeshHost class. */
2628

29+
type RequestId = string;
30+
2731
class MeshProxy {
2832

2933
commandForwardingFn: (cmd: MeshCommand) => void;
@@ -37,6 +41,8 @@ class MeshProxy {
3741
peerSources: Map<string, PeerSource>;
3842
peerSourceRequestIngestFn: (req: PeerSourceRequest) => void;
3943

44+
pendingPeerGroupStates: Map<RequestId, {resolve: (result: PeerGroupState|undefined) => void, reject: (reason: any) => void, timeout: any}>;
45+
4046

4147

4248
constructor(meshCommandFwdFn: (cmd: MeshCommand) => void, linkupCommandFwdFn?: (cmd: LinkupManagerCommand) => void, webRTCConnEventIngestFn?: (ev: WebRTCConnectionEvent) => void) {
@@ -78,6 +84,47 @@ class MeshProxy {
7884
const sender = HashedObject.fromLiteralContext(reply.sender) as Identity;
7985
cb(object, sender, reply.senderEndpoint);
8086
}
87+
} else if (reply.type === 'peer-group-state-reply') {
88+
const cb = this.pendingPeerGroupStates.get(reply.requestId)?.resolve;
89+
const to = this.pendingPeerGroupStates.get(reply.requestId)?.timeout;
90+
91+
this.pendingPeerGroupStates.delete(reply.requestId);
92+
93+
if (to !== undefined) {
94+
window.clearTimeout(to);
95+
}
96+
97+
if (cb !== undefined) {
98+
99+
if (reply.local !== undefined && reply.remote !== undefined) {
100+
const remote = new Map<Endpoint, PeerInfo>();
101+
102+
const state: PeerGroupState = {
103+
local: {endpoint: reply.local.endpoint, identityHash: reply.local.identityHash},
104+
remote: remote
105+
}
106+
107+
if (reply.local.identity !== undefined) {
108+
state.local.identity = HashedObject.fromLiteralContext(reply.local.identity) as Identity;
109+
}
110+
111+
for (const litPeerInfo of reply.remote) {
112+
113+
const peerInfo: PeerInfo = {endpoint: litPeerInfo.endpoint, identityHash: litPeerInfo.identityHash};
114+
115+
if (litPeerInfo.identity !== undefined) {
116+
peerInfo.identity = HashedObject.fromLiteralContext(litPeerInfo.identity) as Identity;
117+
}
118+
119+
remote.set(peerInfo.endpoint, peerInfo);
120+
}
121+
122+
cb(state);
123+
} else {
124+
cb(undefined);
125+
}
126+
127+
}
81128
}
82129
}
83130

@@ -143,6 +190,8 @@ class MeshProxy {
143190
}
144191
}
145192
};
193+
194+
this.pendingPeerGroupStates = new Map();
146195
}
147196

148197
getCommandStreamedReplyIngestFn() {
@@ -182,6 +231,35 @@ class MeshProxy {
182231
this.commandForwardingFn(cmd);
183232
}
184233

234+
async getPeerGroupState(peerGroupId: string, timeout=10000): Promise<PeerGroupState|undefined> {
235+
236+
const p = new Promise<PeerGroupState|undefined>((resolve: (result: PeerGroupState|undefined) => void, reject: (reason: any) => void) => {
237+
const requestId = new RNGImpl().randomHexString(128);
238+
239+
const cmd: ForwardPeerGroupState = {
240+
type: 'forward-peer-group-state',
241+
requestId: requestId,
242+
peerGroupId: peerGroupId
243+
};
244+
245+
246+
247+
const to = window.setTimeout(() => {
248+
if (this.pendingPeerGroupStates.has(requestId)) {
249+
this.pendingPeerGroupStates.delete(requestId)
250+
reject('timeout');
251+
}
252+
}, timeout);
253+
254+
this.pendingPeerGroupStates.set(requestId, {resolve: resolve, reject: reject, timeout: to});
255+
256+
this.commandForwardingFn(cmd);
257+
258+
});
259+
260+
return p;
261+
}
262+
185263
syncObjectWithPeerGroup(peerGroupId: string, obj: HashedObject, mode:SyncMode=SyncMode.full, gossipId?: string, usageToken?: UsageToken): UsageToken {
186264

187265
const ctx = obj.toContext();

0 commit comments

Comments
 (0)