Skip to content

Commit 52fb9f9

Browse files
committed
fix: object discovery was not using identities for listening addresses, per the latest changes
1 parent f9bf568 commit 52fb9f9

File tree

12 files changed

+82
-48
lines changed

12 files changed

+82
-48
lines changed

src/data/collections/Types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { HashedSet } from 'data/model';
22
import { HashedObject } from '../model/immutable/HashedObject';
33

4+
// FIXME: the types thing should be a HashedSet, not a friggin array. What was I thinking?
5+
46
abstract class Types {
57

68
static HashedObject = 'HashedObject';

src/mesh/agents/discovery/ObjectBroadcastAgent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type ObjectBroadcastReply = {
1919

2020
class ObjectBroadcastAgent implements Agent {
2121

22-
static log = new Logger(ObjectBroadcastAgent.name, LogLevel.INFO);
22+
static log = new Logger(ObjectBroadcastAgent.name, LogLevel.DEBUG);
2323

2424
static defaultBroadcastedSuffixBits = 36;
2525

src/mesh/agents/discovery/ObjectDiscoveryAgent.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type ObjectDiscoveryReplyParams = {maxAge?: number, linkupServers?: string[], lo
2020

2121
class ObjectDiscoveryAgent implements Agent {
2222

23-
static log = new Logger(ObjectDiscoveryAgent.name, LogLevel.INFO);
23+
static log = new Logger(ObjectDiscoveryAgent.name, LogLevel.DEBUG);
2424

2525
static agentIdForHexHashSuffix(suffix: string) {
2626
return 'object-discovery-for-' + suffix;
@@ -70,7 +70,7 @@ class ObjectDiscoveryAgent implements Agent {
7070
this.pod = pod;
7171
}
7272

73-
query(linkupServers: string[], localEndpoint: Endpoint, count=1) {
73+
query(linkupServers: string[], localAddress: LinkupAddress, count=1) {
7474

7575
if (this.pod === undefined) {
7676
throw new Error('This ObjectDiscoveryAgent has not been registered to a mesh so it cannot accept queries yet.');
@@ -87,9 +87,12 @@ class ObjectDiscoveryAgent implements Agent {
8787

8888
}
8989

90+
const localEndpoint = localAddress.url();
91+
const localIdentity = localAddress.identity;
92+
9093
if (!this.localEndpoints.has(localEndpoint)) {
9194
ObjectDiscoveryAgent.log.trace('listening on ' + localEndpoint);
92-
this.getNetworkAgent().listenForLinkupMessages(localEndpoint);
95+
this.getNetworkAgent().listenForLinkupMessages(localEndpoint, localIdentity);
9396
this.localEndpoints.add(localEndpoint);
9497
}
9598

src/mesh/agents/network/NetworkAgent.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ class NetworkAgent implements Agent {
512512
this.linkupManager.listenForMessagesNewCall(address, this.newConnectionRequestCallback);
513513
}
514514

515-
listenForLinkupMessages(endpoint: Endpoint) {
516-
let address = LinkupAddress.fromURL(endpoint);
515+
listenForLinkupMessages(endpoint: Endpoint, identity?: Identity) {
516+
let address = LinkupAddress.fromURL(endpoint, identity);
517517
this.linkupMessageListening.add(endpoint);
518518
this.linkupManager.listenForRawMessages(address, this.linkupMessageCallback);
519519
}

src/mesh/agents/peer/sources/ObjectDiscoveryPeerSource.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Hash, HashedObject } from 'data/model';
22
import { ObjectDiscoveryReply } from 'mesh/agents/discovery/ObjectDiscoveryAgent';
33
import { Endpoint } from 'mesh/agents/network/NetworkAgent';
44
import { Mesh } from 'mesh/service/Mesh';
5+
import { LinkupAddress } from 'net/linkup';
56
import { AsyncStream } from 'util/streams';
67
import { PeerInfo } from '../PeerGroupAgent';
78
import { PeerSource } from '../PeerSource';
@@ -14,19 +15,19 @@ class ObjectDiscoveryPeerSource implements PeerSource {
1415
parseEndpoint: (ep: Endpoint) => Promise<PeerInfo | undefined>;
1516

1617
linkupServers: string[];
17-
replyEndpoint: Endpoint;
18+
replyAddress: LinkupAddress;
1819
timeoutMillis: number;
1920

2021
hash: Hash;
2122
replyStream?: AsyncStream<ObjectDiscoveryReply>;
2223

23-
constructor(mesh: Mesh, object: HashedObject, linkupServers: string[], replyEndpoint: Endpoint, parseEndpoint: (ep: Endpoint) => Promise<PeerInfo | undefined>, timeout=3) {
24+
constructor(mesh: Mesh, object: HashedObject, linkupServers: string[], replyAddress: LinkupAddress, parseEndpoint: (ep: Endpoint) => Promise<PeerInfo | undefined>, timeout=3) {
2425
this.mesh = mesh;
2526
this.object = object;
2627
this.parseEndpoint = parseEndpoint;
2728

2829
this.linkupServers = linkupServers;
29-
this.replyEndpoint = replyEndpoint;
30+
this.replyAddress = replyAddress;
3031
this.timeoutMillis = timeout * 1000;
3132

3233
this.hash = object.hash();
@@ -94,11 +95,11 @@ class ObjectDiscoveryPeerSource implements PeerSource {
9495
}
9596

9697
private tryObjectDiscovery(count: number) : AsyncStream<ObjectDiscoveryReply> {
97-
return this.mesh.findObjectByHash(this.hash, this.linkupServers, this.replyEndpoint, count);
98+
return this.mesh.findObjectByHash(this.hash, this.linkupServers, this.replyAddress, count);
9899
}
99100

100101
private retryObjectDiscovery(count: number) {
101-
this.mesh.findObjectByHashRetry(this.hash, this.linkupServers, this.replyEndpoint, count);
102+
this.mesh.findObjectByHashRetry(this.hash, this.linkupServers, this.replyAddress, count);
102103
}
103104
}
104105

src/mesh/service/Mesh.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { ObjectBroadcastAgent, ObjectDiscoveryAgent, ObjectDiscoveryReply, Objec
1010

1111
import { AgentPod } from './AgentPod';
1212
import { AsyncStream } from 'util/streams';
13-
import { LinkupManager } from 'net/linkup';
13+
import { LinkupAddress, LinkupManager } from 'net/linkup';
1414
import { RNGImpl } from 'crypto/random';
1515
import { Logger, LogLevel } from 'util/logging';
1616

@@ -347,39 +347,39 @@ class Mesh {
347347
// are present in the received suffix!). To fix it, either change ObjectDiscoveryAgent to use all the
348348
// received bits, or pass the number of bits explicitly when calling the constructor.
349349

350-
findObjectByHash(hash: Hash, linkupServers: string[], replyEndpoint: Endpoint, count=1, maxAge=30, strictEndpoints=false, includeErrors=false) : AsyncStream<ObjectDiscoveryReply> {
350+
findObjectByHash(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count=1, maxAge=30, strictEndpoints=false, includeErrors=false) : AsyncStream<ObjectDiscoveryReply> {
351351
const suffix = Hashing.toHex(hash);
352-
return this.findObjectByHashSuffix(suffix, linkupServers, replyEndpoint, count, maxAge, strictEndpoints, includeErrors);
352+
return this.findObjectByHashSuffix(suffix, linkupServers, replyAddress, count, maxAge, strictEndpoints, includeErrors);
353353
}
354354

355-
findObjectByHashSuffix(hashSuffix: string, linkupServers: string[], replyEndpoint: Endpoint, count=1, maxAge=30, strictEndpoints=false, includeErrors=false) : AsyncStream<ObjectDiscoveryReply> {
355+
findObjectByHashSuffix(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count=1, maxAge=30, strictEndpoints=false, includeErrors=false) : AsyncStream<ObjectDiscoveryReply> {
356356

357357
const discoveryAgent = this.getDiscoveryAgentFor(hashSuffix);
358358

359-
discoveryAgent.query(linkupServers, replyEndpoint, count);
359+
discoveryAgent.query(linkupServers, replyAddress, count);
360360

361361
let params: ObjectDiscoveryReplyParams = {};
362362

363363
params.maxAge = maxAge;
364364

365365
if (strictEndpoints) {
366366
params.linkupServers = linkupServers;
367-
params.localEndpoints = [replyEndpoint];
367+
params.localEndpoints = [replyAddress.url()];
368368
}
369369

370370
params.includeErrors = includeErrors;
371371

372372
return discoveryAgent.getReplyStream(params);
373373
}
374374

375-
findObjectByHashRetry(hash: Hash, linkupServers: string[], replyEndpoint: Endpoint, count=1): void {
375+
findObjectByHashRetry(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count=1): void {
376376
const suffix = Hashing.toHex(hash);
377-
this.findObjectByHashSuffixRetry(suffix, linkupServers, replyEndpoint, count);
377+
this.findObjectByHashSuffixRetry(suffix, linkupServers, replyAddress, count);
378378
}
379379

380-
findObjectByHashSuffixRetry(hashSuffix: string, linkupServers: string[], replyEndpoint: Endpoint, count=1): void {
380+
findObjectByHashSuffixRetry(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count=1): void {
381381
const discoveryAgent = this.getDiscoveryAgentFor(hashSuffix);
382-
discoveryAgent.query(linkupServers, replyEndpoint, count);
382+
discoveryAgent.query(linkupServers, replyAddress, count);
383383
}
384384

385385
getSyncAgentFor(peerGroupId: PeerGroupId, mutHash: Hash): StateSyncAgent|undefined {

src/mesh/service/PeerNode.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
import { Hash, HashedObject } from 'data/model';
33
import { ObjectDiscoveryPeerSource, PeerInfo } from 'mesh/agents/peer';
4+
import { LinkupAddress } from 'net/linkup';
45
import { Resources } from 'spaces/spaces';
56
import { MultiMap } from 'util/multimap';
67
import { PeerGroupInfo, SyncMode, UsageToken } from './Mesh';
@@ -109,7 +110,7 @@ class PeerNode {
109110

110111
private async discoveryPeerGroupInfo(obj: HashedObject) : Promise<PeerGroupInfo> {
111112
let localPeer = this.resources.getPeersForDiscovery()[0];
112-
let peerSource = new ObjectDiscoveryPeerSource(this.resources.mesh, obj, this.resources.config.linkupServers, localPeer.endpoint, this.resources.getEndointParserForDiscovery());
113+
let peerSource = new ObjectDiscoveryPeerSource(this.resources.mesh, obj, this.resources.config.linkupServers, LinkupAddress.fromURL(localPeer.endpoint, localPeer.identity), this.resources.getEndointParserForDiscovery());
113114

114115
return {
115116
id: PeerNode.discoveryPeerGroupInfoId(obj),

src/mesh/service/remoting/MeshHost.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { ObjectDiscoveryReply } from 'mesh/agents/discovery';
88
import { RNGImpl } from 'crypto/random';
99
import { Identity, RSAKeyPair } from 'data/identity';
1010
import { Store } from 'storage/store';
11+
import { LinkupAddress } from 'net/linkup';
1112

1213
type MeshCommand = JoinPeerGroup | LeavePeerGroup |
1314
SyncObjectsWithPeerGroup | StopSyncObjectsWithPeerGroup |
@@ -21,8 +22,8 @@ type JoinPeerGroup = {
2122
peerGroupId: string,
2223
localPeerEndpoint: Endpoint;
2324
localPeerIdentityHash: Hash;
24-
localPeerIdentity?: LiteralContext | undefined,
25-
localPeerIdentityKeyPair?: LiteralContext | undefined,
25+
localPeerIdentity?: LiteralContext,
26+
localPeerIdentityKeyPair?: LiteralContext,
2627
//localPeer: PeerInfo,
2728
config?: PeerGroupAgentConfig;
2829
usageToken?: UsageToken
@@ -67,6 +68,7 @@ type FindObjectByHash = {
6768
hash: Hash,
6869
linkupServers: Array<string>,
6970
replyEndpoint: Endpoint,
71+
replyIdentity?: LiteralContext,
7072
count?: number,
7173
maxAge?: number,
7274
strictEndpoints?: boolean,
@@ -80,6 +82,7 @@ type FindObjectByHashSuffix = {
8082
hashSuffix: string,
8183
linkupServers: Array<string>,
8284
replyEndpoint: Endpoint,
85+
replyIdentity?: LiteralContext,
8386
count?: number,
8487
maxAge?: number,
8588
includeErrors?: boolean,
@@ -293,18 +296,21 @@ class MeshHost {
293296
command.type === 'find-object-by-hash-suffix') {
294297
const find = command as FindObjectByHash | FindObjectByHashSuffix;
295298

299+
const id = find.replyIdentity === undefined? undefined : HashedObject.fromLiteralContext(find.replyIdentity) as Identity;
300+
const replyAddress = LinkupAddress.fromURL(find.replyEndpoint, id);
301+
296302
if (!find.retry) {
297303

298304
const streamId = command.streamId;
299305
let replyStream: AsyncStream<ObjectDiscoveryReply>;
300306

301307
if (command.type === 'find-object-by-hash') {
302308
replyStream = this.mesh.findObjectByHash(
303-
(find as FindObjectByHash).hash, find.linkupServers, find.replyEndpoint, find.count, find.maxAge, find.strictEndpoints, find.includeErrors
309+
(find as FindObjectByHash).hash, find.linkupServers, replyAddress, find.count, find.maxAge, find.strictEndpoints, find.includeErrors
304310
);
305311
} else {
306312
replyStream = this.mesh.findObjectByHashSuffix(
307-
(find as FindObjectByHashSuffix).hashSuffix, find.linkupServers, find.replyEndpoint, find.count, find.maxAge, find.strictEndpoints, find.includeErrors
313+
(find as FindObjectByHashSuffix).hashSuffix, find.linkupServers, replyAddress, find.count, find.maxAge, find.strictEndpoints, find.includeErrors
308314
);
309315
}
310316

@@ -356,11 +362,11 @@ class MeshHost {
356362
} else {
357363
if (command.type === 'find-object-by-hash') {
358364
this.mesh.findObjectByHashRetry(
359-
(find as FindObjectByHash).hash, find.linkupServers, find.replyEndpoint, find.count
365+
(find as FindObjectByHash).hash, find.linkupServers, replyAddress, find.count
360366
);
361367
} else {
362368
this.mesh.findObjectByHashSuffixRetry(
363-
(find as FindObjectByHashSuffix).hashSuffix, find.linkupServers, find.replyEndpoint, find.count
369+
(find as FindObjectByHashSuffix).hashSuffix, find.linkupServers, replyAddress, find.count
364370
);
365371
}
366372
}

src/mesh/service/remoting/MeshProxy.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { Context, Hash, HashedObject } from 'data/model';
1414
import { AsyncStream, BufferedAsyncStream, BufferingAsyncStreamSource } from 'util/streams';
1515
import { ObjectDiscoveryReply } from 'mesh/agents/discovery';
1616
import { Endpoint } from 'mesh/agents/network';
17-
import { LinkupManager, LinkupManagerCommand, LinkupManagerProxy } from 'net/linkup';
17+
import { LinkupAddress, LinkupManager, LinkupManagerCommand, LinkupManagerProxy } from 'net/linkup';
1818
import { WebRTCConnectionEvent, WebRTCConnectionsHost } from 'net/transport';
1919

2020
class MeshProxy {
@@ -267,7 +267,7 @@ class MeshProxy {
267267
this.commandForwardingFn(cmd);
268268
}
269269

270-
findObjectByHash(hash: Hash, linkupServers: string[], replyEndpoint: Endpoint, count=1, maxAge=30, strictEndpoints=false) : AsyncStream<ObjectDiscoveryReply> {
270+
findObjectByHash(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count=1, maxAge=30, strictEndpoints=false) : AsyncStream<ObjectDiscoveryReply> {
271271
const streamId = new RNGImpl().randomHexString(64);
272272

273273
const src = new BufferingAsyncStreamSource<ObjectDiscoveryReply>()
@@ -278,7 +278,8 @@ class MeshProxy {
278278
type: 'find-object-by-hash',
279279
hash: hash,
280280
linkupServers: linkupServers,
281-
replyEndpoint: replyEndpoint,
281+
replyEndpoint: replyAddress.url(),
282+
replyIdentity: replyAddress.identity === undefined? undefined: replyAddress.identity.toLiteralContext(),
282283
count: count,
283284
maxAge: maxAge,
284285
strictEndpoints: strictEndpoints,
@@ -291,7 +292,7 @@ class MeshProxy {
291292
return new BufferedAsyncStream<ObjectDiscoveryReply>(src);
292293
}
293294

294-
findObjectByHashSuffix(hashSuffix: string, linkupServers: string[], replyEndpoint: Endpoint, count=1, maxAge=30, strictEndpoints=false) : AsyncStream<ObjectDiscoveryReply> {
295+
findObjectByHashSuffix(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count=1, maxAge=30, strictEndpoints=false) : AsyncStream<ObjectDiscoveryReply> {
295296
const streamId = new RNGImpl().randomHexString(64);
296297

297298
const src = new BufferingAsyncStreamSource<ObjectDiscoveryReply>()
@@ -302,7 +303,8 @@ class MeshProxy {
302303
type: 'find-object-by-hash-suffix',
303304
hashSuffix: hashSuffix,
304305
linkupServers: linkupServers,
305-
replyEndpoint: replyEndpoint,
306+
replyEndpoint: replyAddress.url(),
307+
replyIdentity: replyAddress.identity === undefined? undefined: replyAddress.identity.toLiteralContext(),
306308
count: count,
307309
maxAge: maxAge,
308310
strictEndpoints: strictEndpoints,
@@ -321,25 +323,27 @@ class MeshProxy {
321323
this.commandForwardingFn(cmd);
322324
}
323325

324-
findObjectByHashRetry(hash: Hash, linkupServers: string[], replyEndpoint: Endpoint, count=1): void {
326+
findObjectByHashRetry(hash: Hash, linkupServers: string[], replyAddress: LinkupAddress, count=1): void {
325327
const cmd: FindObjectByHash = {
326328
type: 'find-object-by-hash',
327329
hash: hash,
328330
linkupServers: linkupServers,
329-
replyEndpoint: replyEndpoint,
331+
replyEndpoint: replyAddress.url(),
332+
replyIdentity: replyAddress.identity === undefined? undefined: replyAddress.identity.toLiteralContext(),
330333
count: count,
331334
retry: true,
332335
}
333336

334337
this.commandForwardingFn(cmd);
335338
}
336339

337-
findObjectByHashSuffixRetry(hashSuffix: string, linkupServers: string[], replyEndpoint: Endpoint, count=1): void {
340+
findObjectByHashSuffixRetry(hashSuffix: string, linkupServers: string[], replyAddress: LinkupAddress, count=1): void {
338341
const cmd: FindObjectByHashSuffix = {
339342
type: 'find-object-by-hash-suffix',
340343
hashSuffix: hashSuffix,
341344
linkupServers: linkupServers,
342-
replyEndpoint: replyEndpoint,
345+
replyEndpoint: replyAddress.url(),
346+
replyIdentity: replyAddress.identity === undefined? undefined: replyAddress.identity.toLiteralContext(),
343347
count: count,
344348
retry: true,
345349
}

0 commit comments

Comments
 (0)