Skip to content

Commit 7205373

Browse files
committed
added a "instanceId" field to connectionInfo in the NetworkAgent
this field is generated in the Linkup layer, and distinguishes from different devices that are listening on the same address (e.g. several devices owner by the same person, and using the same identity)
1 parent 99ea09d commit 7205373

File tree

14 files changed

+153
-76
lines changed

14 files changed

+153
-76
lines changed

src/mesh/agents/network/NetworkAgent.ts

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { WebRTCConnection } from 'net/transport/WebRTCConnection';
88
import { RNGImpl } from 'crypto/random';
99
import { SignallingServerConnection } from 'net/linkup/SignallingServerConnection';
1010
import { WebSocketConnection } from 'net/transport/WebSocketConnection';
11-
import { LinkupManagerHost, LinkupManagerEvent } from 'net/linkup';
11+
import { LinkupManagerHost, LinkupManagerEvent, NewCallMessageCallback } from 'net/linkup';
1212
import { WebRTCConnectionCommand, WebRTCConnectionEvent, WebRTCConnectionProxy } from 'net/transport';
1313
import { Identity } from 'data/identity';
1414

@@ -78,6 +78,7 @@ type ConnectionInfo = {
7878
localEndpoint: Endpoint,
7979
remoteEndpoint: Endpoint,
8080
connId : ConnectionId,
81+
remoteInstanceId?: string, // see the note in instanceIds in class SignallingServerConnection
8182
status: ConnectionStatus,
8283
timestamp: number,
8384
requestedBy: Set<AgentId>
@@ -121,13 +122,13 @@ class NetworkAgent implements Agent {
121122
connections : Map<ConnectionId, Connection>;
122123

123124
connectionInfo : Map<ConnectionId, ConnectionInfo>;
124-
deferredInitialMessages : Map<ConnectionId, Array<any>>;
125+
deferredInitialMessages : Map<ConnectionId, Array<{instanceId: string, message: any}>>;
125126

126127
messageCallback : (data: any, conn: Connection) => void;
127128

128129
connectionReadyCallback : (conn: Connection) => void;
129130

130-
newConnectionRequestCallback : (sender: LinkupAddress, receiver: LinkupAddress, callId: string, message: any) => void;
131+
newConnectionRequestCallback : NewCallMessageCallback;
131132

132133
linkupMessageCallback : (sender: LinkupAddress, receiver: LinkupAddress, message: any) => void;
133134

@@ -209,6 +210,9 @@ class NetworkAgent implements Agent {
209210
if (connInfo.status !== ConnectionStatus.Ready) {
210211
this.connections.set(connectionId, conn);
211212
connInfo.status = ConnectionStatus.Ready;
213+
if (connInfo.remoteInstanceId === undefined) {
214+
connInfo.remoteInstanceId = conn.remoteInstanceId;
215+
}
212216
const ev: ConnectionStatusChangeEvent = {
213217
type: NetworkEventType.ConnectionStatusChange,
214218
content: {
@@ -227,7 +231,7 @@ class NetworkAgent implements Agent {
227231
}
228232
}
229233

230-
this.newConnectionRequestCallback = (sender: LinkupAddress, receiver: LinkupAddress, connectionId: string, message: any) => {
234+
this.newConnectionRequestCallback = (sender: LinkupAddress, receiver: LinkupAddress, connectionId: string, instanceId: string, message: any) => {
231235

232236
let connInfo = this.connectionInfo.get(connectionId);
233237

@@ -237,7 +241,8 @@ class NetworkAgent implements Agent {
237241
connInfo = {
238242
localEndpoint: receiver.url(),
239243
remoteEndpoint: sender.url(),
240-
connId: connectionId,
244+
connId: connectionId,
245+
remoteInstanceId: instanceId,
241246
status: ConnectionStatus.Received,
242247
timestamp: Date.now(),
243248
requestedBy: new Set()
@@ -246,13 +251,22 @@ class NetworkAgent implements Agent {
246251
this.connectionInfo.set(connectionId, connInfo);
247252
}
248253

254+
/*if (connInfo.localEndpoint === receiver.url() &&
255+
connInfo.remoteEndpoint === sender.url() &&
256+
connInfo.remoteEndpoint !== instanceId) {
257+
258+
console.log('MISMATCH')
259+
CONSOL
260+
}*/
261+
249262
if (connInfo.localEndpoint === receiver.url() &&
250-
connInfo.remoteEndpoint === sender.url()) {
263+
connInfo.remoteEndpoint === sender.url() &&
264+
connInfo.remoteInstanceId === instanceId) {
251265

252266
if (connInfo.status === ConnectionStatus.Establishing) {
253-
this.acceptReceivedConnectionMessages(connectionId, message);
267+
this.acceptReceivedConnectionMessages(connectionId, instanceId, message);
254268
} else if (connInfo.status === ConnectionStatus.Received) {
255-
this.deferReceivedConnectionMessage(connectionId, message);
269+
this.deferReceivedConnectionMessage(connectionId, instanceId, message);
256270

257271
if (isNew) {
258272
let ev: ConnectionStatusChangeEvent = {
@@ -389,26 +403,27 @@ class NetworkAgent implements Agent {
389403
}
390404
*/
391405

392-
private acceptReceivedConnectionMessages(connId: ConnectionId, message?: any) {
406+
private acceptReceivedConnectionMessages(connId: ConnectionId, instanceId?: string, message?: any) {
393407

394408
let messages = this.deferredInitialMessages.get(connId);
395409

396410
if (messages === undefined) {
397411
messages = [];
398412
}
399413

400-
if (message !== undefined) {
401-
messages.push(message);
414+
if (message !== undefined && instanceId !== undefined) {
415+
messages.push({instanceId: instanceId, message: message});
402416
}
403417

404418

405-
for (const message of messages) {
419+
for (const {message, instanceId } of messages) {
406420
let conn = this.connections.get(connId);
407421

408422
if (conn === undefined) {
409423
let connInfo = this.connectionInfo.get(connId) as ConnectionInfo;
410424

411425
if (connInfo !== undefined) {
426+
412427
const receiver = LinkupAddress.fromURL(connInfo.localEndpoint);
413428
const sender = LinkupAddress.fromURL(connInfo.remoteEndpoint);
414429

@@ -434,13 +449,13 @@ class NetworkAgent implements Agent {
434449

435450
if (conn instanceof WebRTCConnection || conn instanceof WebRTCConnectionProxy || conn instanceof WebSocketConnection) {
436451
conn.setMessageCallback(this.messageCallback);
437-
conn.answer(message);
452+
conn.answer(instanceId, message);
438453
}
439454
} else {
440455
if (conn instanceof WebRTCConnection || conn instanceof WebRTCConnectionProxy) {
441-
conn.receiveSignallingMessage(message);
456+
conn.receiveSignallingMessage(instanceId, message);
442457
} else if (conn instanceof WebSocketConnection) {
443-
conn.answer(message);
458+
conn.answer(instanceId, message);
444459
}
445460
}
446461

@@ -450,16 +465,16 @@ class NetworkAgent implements Agent {
450465
}
451466
}
452467

453-
private deferReceivedConnectionMessage(connId: ConnectionId, message: any) {
468+
private deferReceivedConnectionMessage(connId: ConnectionId, instanceId: string, message: any) {
454469

455470
let messages = this.deferredInitialMessages.get(connId);
456471

457472
if (messages === undefined) {
458-
messages = new Array<any>();
473+
messages = new Array<{instanceId: string, message: any}>();
459474
this.deferredInitialMessages.set(connId, messages);
460475
}
461476

462-
messages.push(message);
477+
messages.push({message: message, instanceId: instanceId});
463478
}
464479

465480
// Network listen, shutdown
@@ -534,7 +549,8 @@ class NetworkAgent implements Agent {
534549
{
535550
localEndpoint: local,
536551
remoteEndpoint: remote,
537-
connId: callId, status: ConnectionStatus.Establishing,
552+
connId: callId,
553+
status: ConnectionStatus.Establishing,
538554
timestamp: Date.now(),
539555
requestedBy: new Set([requestedBy])
540556
});

src/mesh/agents/peer/PeerGroupAgent.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,12 @@ class PeerGroupAgent implements Agent {
452452

453453
// Connection deduplication logic.
454454

455+
// Note: ATM the PeerGroupAgent will aggressively deduplicate all connections that go to the
456+
// same endpoint. In the future, we could use the "instanceId" field in the NetworkAgent's
457+
// connectionInfo to tell when these connections are actually going to the same devices
458+
// (besides belonging to the same identity), and may not prune connections up to a given
459+
// number of different devices.
460+
455461
private deduplicateConnections() {
456462

457463
for (const [endpoint, connIds] of this.connectionsPerEndpoint.entries()) {

src/net/linkup/LinkupServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { LinkupAddress } from './LinkupAddress';
22

3-
type NewCallMessageCallback = (sender: LinkupAddress, recipient: LinkupAddress, callId: string, message: any) => void;
4-
type MessageCallback = (message: any) => void;
3+
type NewCallMessageCallback = (sender: LinkupAddress, recipient: LinkupAddress, callId: string, instanceId: string, message: any) => void;
4+
type MessageCallback = (instanceId: string, message: any) => void;
55

66
type ListeningAddressesQueryCallback = (queryId: string, matches: Array<LinkupAddress>) => void;
77

src/net/linkup/SignallingServerConnection.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,20 @@ import { Logger, LogLevel } from 'util/logging';
22

33
import { LinkupServer, RawMessageCallback, NewCallMessageCallback, MessageCallback, ListeningAddressesQueryCallback } from './LinkupServer';
44
import { LinkupAddress } from './LinkupAddress';
5+
import { RNGImpl } from 'crypto/random';
56

67
const CONN_BACKOFF_TIME = 15000;
78

9+
/*
10+
* On instanceIds: when connecting to a linkup server, the SignallingServerConnection class will
11+
* create a randm "instanceId" value. When several peers are listening for
12+
* connections on the same endpoint (e.g. several devices owner by the same person)
13+
* and another peer initiates a connection, each of these peers will include a
14+
* different instanceId. This enables the initiator to tell them appart in the
15+
* WebRTC signalling phase, and to choose if he wants to connect to just one of
16+
* them, a few, etc.
17+
*/
18+
819
class SignallingServerConnection implements LinkupServer {
920

1021
static logger = new Logger(SignallingServerConnection.name, LogLevel.ERROR);
@@ -19,7 +30,8 @@ class SignallingServerConnection implements LinkupServer {
1930
return serverURL.slice(SignallingServerConnection.WRTC_URL_PREFIX.length);
2031
}
2132

22-
readonly serverURL : string;
33+
readonly serverURL : string;
34+
readonly instanceId : string; // see note above
2335

2436
ws : WebSocket | null;
2537

@@ -44,6 +56,7 @@ class SignallingServerConnection implements LinkupServer {
4456
}
4557

4658
this.serverURL = serverURL;
59+
this.instanceId = new RNGImpl().randomHexString(128);
4760

4861
this.ws = null;
4962

@@ -153,6 +166,7 @@ class SignallingServerConnection implements LinkupServer {
153166
'action' : 'send',
154167
'linkupId' : recipient.linkupId,
155168
'callId' : callId,
169+
'instanceId' : this.instanceId,
156170
'data' : data,
157171
'replyServerUrl' : sender.serverURL,
158172
'replyLinkupId' : sender.linkupId,
@@ -269,7 +283,7 @@ class SignallingServerConnection implements LinkupServer {
269283
if (callMessageCallbacks !== undefined) {
270284
callMessageCallbacks.forEach((callback: MessageCallback) => {
271285
SignallingServerConnection.logger.debug('Delivering linkup message to ' + linkupId + ' on call ' + message['callId']);
272-
callback(message['data']);
286+
callback(message['instanceId'], message['data']);
273287
found = true;
274288
});
275289
}
@@ -281,7 +295,7 @@ class SignallingServerConnection implements LinkupServer {
281295
if (linkupIdCallbacks !== undefined) {
282296
linkupIdCallbacks.forEach((callback: NewCallMessageCallback) => {
283297
SignallingServerConnection.logger.debug('Calling default callback for linkupId ' + linkupId + ', unlistened callId is ' + callId);
284-
callback(new LinkupAddress(message['replyServerUrl'], message['replyLinkupId']), new LinkupAddress(this.serverURL, linkupId), callId, message['data']);
298+
callback(new LinkupAddress(message['replyServerUrl'], message['replyLinkupId']), new LinkupAddress(this.serverURL, linkupId), callId, message['instanceId'], message['data']);
285299
found = true;
286300
})
287301
}

src/net/linkup/WebSocketListener.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class WebSocketListener implements LinkupServer {
8989

9090
if (callbacks.size > 0) {
9191
for (const callback of callbacks) {
92-
callback(sender, recipient, connId, {ws: socket, reverse: reverse});
92+
callback(sender, recipient, connId, undefined as any, {ws: socket, reverse: reverse});
9393
parseOK = true;
9494
}
9595
} else {

src/net/linkup/remoting/LinkupManagerHost.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type NewCallMessageEvent = {
6464
sender: Endpoint,
6565
recipient: Endpoint,
6666
callId: string,
67+
instanceId: string
6768
message: any
6869
}
6970

@@ -84,6 +85,7 @@ type MessageOnCallEvent = {
8485
type: 'message-on-call',
8586
recipient: Endpoint,
8687
callId: string,
88+
instanceId: string,
8789
message: any
8890
}
8991

@@ -128,13 +130,14 @@ class LinkupManagerHost {
128130
this.eventCallback = eventCallback;
129131

130132
this.newCallMessageCallback =
131-
(sender: LinkupAddress, recipient: LinkupAddress, callId: string, message: any) => {
133+
(sender: LinkupAddress, recipient: LinkupAddress, callId: string, instanceId: string, message: any) => {
132134

133135
const ev: NewCallMessageEvent = {
134136
type: 'new-call-message',
135137
sender: sender.url(),
136138
recipient: recipient.url(),
137139
callId: callId,
140+
instanceId: instanceId,
138141
message: message
139142
}
140143

@@ -186,11 +189,12 @@ class LinkupManagerHost {
186189

187190
const listen = cmd as ListenForMessagesOnCall;
188191

189-
const callback = (msg: any) => {
192+
const callback: MessageCallback = (instanceId: string, msg: any) => {
190193
const ev: MessageOnCallEvent = {
191194
type: 'message-on-call',
192195
recipient: listen.recipient,
193196
callId: listen.callId,
197+
instanceId: instanceId,
194198
message: msg
195199
};
196200

src/net/linkup/remoting/LinkupManagerProxy.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class LinkupManagerProxy {
4040
const recipient = LinkupAddress.fromURL(newCall.recipient);
4141

4242
try {
43-
cb(sender, recipient, newCall.callId, newCall.message);
43+
cb(sender, recipient, newCall.callId, newCall.instanceId, newCall.message);
4444
} catch (e) {
4545
console.log('Error in callback invocation within LinkupManagerProxy: ', e);
4646
}
@@ -82,7 +82,7 @@ class LinkupManagerProxy {
8282
for (const cb of this.messagesOnCallCallbacks.get(msg.recipient + '/' + msg.callId)) {
8383

8484
try {
85-
cb(msg.message);
85+
cb(msg.instanceId, msg.message);
8686
} catch (e) {
8787
console.log('Error in callback invocation within LinkupManagerProxy: ', e);
8888
}

src/net/transport/Connection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ interface Connection {
44

55
readonly localAddress: LinkupAddress;
66
readonly remoteAddress: LinkupAddress;
7+
readonly remoteInstanceId?: string;
78

89
getConnectionId() : string;
910
initiatedLocally(): boolean;

0 commit comments

Comments
 (0)