Skip to content

Commit ab0bc0a

Browse files
committed
bug fixes!
1 parent b0b73fa commit ab0bc0a

File tree

11 files changed

+117
-47
lines changed

11 files changed

+117
-47
lines changed

src/mesh/agents/network/NetworkAgent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ class NetworkAgent implements Agent {
373373

374374
proxy?.connectionEventIngestFn(ev);
375375

376-
if (ev.type === 'connection-status-change' && ev.status === 'closed') {
376+
if (ev.type === 'connection-status-change' && ev.channelStatus === 'closed') {
377377
this.connProxies?.delete(ev.connId);
378378
}
379379
};

src/mesh/agents/peer/PeerGroupAgent.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,16 @@ class PeerGroupAgent implements Agent {
196196
peerConnectionTimeout: params.peerConnectionTimeout || 20,
197197
peerConnectionAttemptInterval: params.peerConnectionAttemptInterval || 10,
198198
peerDiscoveryAttemptInterval: params.peerDiscoveryAttemptInterval || 15,
199-
tickInterval: params.tickInterval || 1
199+
tickInterval: params.tickInterval || 10
200200
};
201201

202202
this.tick = async () => {
203203

204204
if (this.tickLock.acquire()) {
205205
try {
206+
207+
console.log(this.peerGroupId + ' has ' + this.getPeers().length + ' peers')
208+
206209
this.cleanUp();
207210
this.queryForOnlinePeers();
208211
this.deduplicateConnections();
@@ -280,6 +283,10 @@ class PeerGroupAgent implements Agent {
280283
return unique;
281284
}
282285

286+
isPeer(ep: Endpoint){
287+
return this.findWorkingConnectionId(ep, false) !== undefined;
288+
}
289+
283290
validateConnectedPeer(ep: Endpoint) : boolean {
284291
let connId = this.findWorkingConnectionId(ep);
285292
return connId !== undefined;
@@ -627,7 +634,7 @@ class PeerGroupAgent implements Agent {
627634
// Connection handling: find a working connecton to an ep, decide whether to connect to or accept a
628635
// connection from a potential peer.
629636

630-
private findWorkingConnectionId(ep: Endpoint) : ConnectionId | undefined {
637+
private findWorkingConnectionId(ep: Endpoint, validate=true) : ConnectionId | undefined {
631638
let connIds = this.connectionsPerEndpoint.get(ep);
632639

633640
if (connIds !== undefined) {
@@ -638,7 +645,7 @@ class PeerGroupAgent implements Agent {
638645

639646
if (pc !== undefined &&
640647
pc.status === PeerConnectionStatus.Ready &&
641-
this.getNetworkAgent().checkConnection(connId)) {
648+
(!validate || this.getNetworkAgent().checkConnection(connId))) {
642649
return connId;
643650
}
644651

src/mesh/agents/state/HeaderBasedSyncAgent.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { Hash, HashedObject, HashedSet, Literal, LiteralUtils, MutableObject, MutationOp } from 'data/model';
2-
import { AgentPod } from 'mesh/service/AgentPod';
2+
import { AgentEvent, AgentPod } from 'mesh/service/AgentPod';
33
import { Store } from 'storage/store';
44
import { Logger, LogLevel } from 'util/logging';
55
import { Endpoint } from '../network/NetworkAgent';
6-
import { PeerGroupAgent } from '../peer/PeerGroupAgent';
6+
import { LostPeerEvent, NewPeerEvent, PeerGroupAgent, PeerMeshEventType } from '../peer/PeerGroupAgent';
77
import { PeeringAgentBase } from '../peer/PeeringAgentBase';
88
import { HeaderBasedState } from './history/HeaderBasedState';
99
import { AgentStateUpdateEvent, GossipEventTypes, StateGossipAgent } from './StateGossipAgent';
@@ -159,12 +159,10 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
159159
if (isNew) {
160160
this.synchronizer.onNewHistory(sender, unknown);
161161
}
162-
163-
this.emitSyncState();
164-
165-
166162
}
167163

164+
this.emitSyncState();
165+
168166
}
169167

170168
return isNew;
@@ -219,8 +217,6 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
219217
await this.synchronizer.onNewLocalOp(op);
220218
await this.updateStateFromStore();
221219
}
222-
223-
this.emitSyncState();
224220
};
225221

226222
literalIsValidOp(literal?: Literal, log=false): boolean {
@@ -306,7 +302,9 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
306302
type: GossipEventTypes.AgentStateUpdate,
307303
content: { agentId: this.getAgentId(), state }
308304
}
305+
309306
this.pod?.broadcastEvent(stateUpdate);
307+
this.emitSyncState();
310308
}
311309

312310
}
@@ -357,12 +355,13 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
357355
emitSyncState() {
358356

359357
if (this.syncEventSource !== undefined) {
358+
360359
const ev: SyncStateUpdateEvent = {
361360
emitter: this.mutableObj,
362361
action: SyncObserverEventTypes.SyncStateUpdate,
363362
data: this.getSyncState()
364363
};
365-
364+
366365
this.syncEventSource.emit(ev);
367366
}
368367
}
@@ -374,6 +373,14 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
374373
const remoteStateHashes = gossipAgent.getAllRemoteStateForAgent(this.getAgentId());
375374
const localStateHash = gossipAgent.getLocalStateForAgent(this.getAgentId());
376375

376+
/*console.log('PEER GROUP AGENT', this.peerGroupAgent);
377+
console.log('GOSSIP AGENT', gossipAgent);
378+
console.log('GOSSIP AGENT, agent id', this.getAgentId());
379+
console.log('GOSSIP AGENT, local state', localStateHash);
380+
console.log('GOSSIP AGENT, remote states', remoteStateHashes);
381+
console.log('GOSSIP AGENT, available local states', Array.from(gossipAgent.localState.keys()))
382+
console.log('GOSSIP AGENT', new Error());*/
383+
377384
let inSync = true;
378385
for (const remoteHash of Object.values(remoteStateHashes)) {
379386
if (remoteHash !== localStateHash) {
@@ -394,6 +401,22 @@ class HeaderBasedSyncAgent extends PeeringAgentBase implements StateSyncAgent {
394401
getPeerGroupId(): string {
395402
return this.peerGroupAgent.peerGroupId;
396403
}
404+
405+
receiveLocalEvent(ev: AgentEvent): void {
406+
if (ev.type === PeerMeshEventType.LostPeer) {
407+
let lostPeerEv = ev as LostPeerEvent;
408+
409+
if (lostPeerEv.content.peerGroupId === this.peerGroupAgent.peerGroupId) {
410+
this.emitSyncState();
411+
}
412+
} else if (ev.type === PeerMeshEventType.NewPeer) {
413+
let lostPeerEv = ev as NewPeerEvent;
414+
415+
if (lostPeerEv.content.peerGroupId === this.peerGroupAgent.peerGroupId) {
416+
this.emitSyncState();
417+
}
418+
}
419+
}
397420
}
398421

399422
export { SyncMsg as HistoryMsg, HeaderBasedSyncAgent, StateFilter }

src/mesh/agents/state/StateGossipAgent.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ class StateGossipAgent extends PeeringAgentBase {
317317

318318
if (this.trackedAgentIds.has(agentId)) {
319319
const hash = state.hash();
320-
320+
321321
const currentState = this.localState.get(agentId);
322322

323323
if (currentState !== undefined && hash !== currentState) {
@@ -692,10 +692,13 @@ class StateGossipAgent extends PeeringAgentBase {
692692
let state: {[key: Endpoint]: Hash} = {};
693693

694694
for (const [endpoint, peerState] of this.remoteState.entries()) {
695-
const stateHash = peerState.get(agentId);
695+
696+
if (this.peerGroupAgent.isPeer(endpoint)) { // <-- this needs to work even if we still haven't received the
697+
const stateHash = peerState.get(agentId); // event telling us that endpoint is offline, so call directly
696698

697-
if (stateHash !== undefined) {
698-
state[endpoint] = stateHash;
699+
if (stateHash !== undefined) {
700+
state[endpoint] = stateHash;
701+
}
699702
}
700703
}
701704

src/mesh/agents/state/SyncObserverAgent.ts

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Agent } from 'mesh/service';
33
import { AgentId } from 'mesh/service/Agent';
44
import { AgentPod, AgentEvent, AgentPodEventType, AgentSetChangeEvent, AgentSetChange } from 'mesh/service/AgentPod';
55
import { Event, EventRelay, Observer } from 'util/events';
6+
import { MultiMap } from 'util/multimap';
67
import { Endpoint } from '../network';
78
import { StateSyncAgent } from './StateSyncAgent';
89

@@ -45,9 +46,11 @@ class SyncObserverAgent implements Agent {
4546

4647
pod?: AgentPod;
4748

48-
relays: Map<AgentId, [EventRelay<HashedObject>, SyncObserver]>;
49+
observers : MultiMap<AgentId, SyncObserver>;
50+
relays : Map<AgentId, [EventRelay<HashedObject>, SyncObserver, MutableObject]>;
4951

5052
constructor() {
53+
this.observers = new MultiMap();
5154
this.relays = new Map();
5255
}
5356

@@ -68,47 +71,53 @@ class SyncObserverAgent implements Agent {
6871
const syncAgentId = mut.getSyncAgentId(peerGroupId);
6972
const syncAgent = this.pod.getAgent(syncAgentId) as StateSyncAgent|undefined;
7073

71-
let pair = this.relays.get(syncAgentId);
74+
let tuple = this.relays.get(syncAgentId);
7275

73-
74-
if (pair === undefined) {
76+
if (tuple === undefined) {
7577

7678
const relay = new EventRelay<HashedObject>(mut);
7779
const syncAgentObs = (ev: SyncEvent) => {
7880
relay.emit(ev);
7981
}
8082

81-
pair = [relay, syncAgentObs];
83+
tuple = [relay, syncAgentObs, mut];
8284

83-
this.relays.set(syncAgentId, [relay, syncAgentObs]);
85+
this.relays.set(syncAgentId, tuple);
8486

8587

8688
if (syncAgent !== undefined) {
8789
syncAgent.getSyncEventSource().addObserver(syncAgentObs);
8890
}
8991
}
9092

91-
pair[0].addObserver(obs);
93+
tuple[0].addObserver(obs);
9294

9395
if (syncAgent !== undefined) {
9496
obs({
9597
emitter: mut,
9698
action: SyncObserverEventTypes.SyncStateUpdate,
9799
data: syncAgent.getSyncState()
98100
});
101+
} else {
102+
obs({
103+
emitter: mut,
104+
action: SyncObserverEventTypes.SyncStateUpdate,
105+
data: {allPeersInSync: false, opsToFetch: 0, remoteStateHashes: {}, synchronizing: false}
106+
});
99107
}
100108
}
101109

102110
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId: PeerGroupId) {
111+
103112
if (this.pod === undefined) {
104113
throw new Error('Trying to remove a sync observer, but the SyncObserverAgent is not ready.');
105114
}
106115

107116
const syncAgentId = mut.getSyncAgentId(peerGroupId);
108-
const pair = this.relays.get(syncAgentId);
117+
const tuple = this.relays.get(syncAgentId);
109118

110-
if (pair !== undefined) {
111-
const [relay, syncAgentObs] = pair;
119+
if (tuple !== undefined) {
120+
const [relay, syncAgentObs, _mut] = tuple;
112121
relay.removeObserver(obs);
113122

114123
if (relay.observers.size === 0) {
@@ -130,19 +139,34 @@ class SyncObserverAgent implements Agent {
130139
const agentEv = ev as AgentSetChangeEvent;
131140

132141
const syncAgentId = agentEv.content.agentId;
133-
const pair = this.relays.get(syncAgentId);
142+
const tuple = this.relays.get(syncAgentId);
134143

135-
if (pair !== undefined) {
144+
if (tuple !== undefined) {
145+
const [relay, syncAgentObserver, mut] = tuple;
136146
if (agentEv.content.change === AgentSetChange.Addition) {
147+
137148
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent;
138-
const [_relay, syncAgentObserver] = pair;
139149
syncAgent.getSyncEventSource().addObserver(syncAgentObserver);
150+
151+
relay.emit({
152+
emitter: mut,
153+
action: SyncObserverEventTypes.SyncStateUpdate,
154+
data: syncAgent.getSyncState()
155+
});
156+
140157
} else if (agentEv.content.change === AgentSetChange.Removal) {
158+
159+
console.log('SENDING LAST EV FOR ' + mut.getLastHash());
160+
141161
const syncAgent = this.pod?.getAgent(syncAgentId) as StateSyncAgent|undefined;
142-
const [_relay, syncAgentObserver] = pair;
143-
syncAgent?.getSyncEventSource().addObserver(syncAgentObserver);
162+
syncAgent?.getSyncEventSource().removeObserver(syncAgentObserver); // I think this will have no effect
163+
relay.emit({
164+
emitter: mut,
165+
action: SyncObserverEventTypes.SyncStateUpdate,
166+
data: {allPeersInSync: false, opsToFetch: 0, remoteStateHashes: {}, synchronizing: false}
167+
});
168+
144169
}
145-
146170
}
147171
}
148172
}

src/mesh/service/Mesh.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,6 @@ class Mesh implements MeshInterface {
297297
}
298298
}
299299
}
300-
301-
302300
}
303301

304302

@@ -835,7 +833,7 @@ class Mesh implements MeshInterface {
835833
if (usageInfo.type === 'peer-group') {
836834
return usageInfo.type + '-' + usageInfo.peerGroupId.replace(/[-]/g, '--');
837835
} else if (usageInfo.type === 'object-sync') {
838-
return usageInfo.type + '-' + usageInfo.peerGroupId.replace(/[-]/g, '--');
836+
return usageInfo.type + '-' + usageInfo.objHash + '-' + usageInfo.peerGroupId.replace(/[-]/g, '--');
839837
} else {
840838
return usageInfo.type + '-' + usageInfo.objHash + '-' + usageInfo.broadcastedSuffixBits;
841839
}

src/mesh/service/remoting/MeshHost.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,8 @@ class MeshHost {
411411

412412
reply.state = state;
413413

414+
console.log('SENDING SYNC STATE reply=', reply)
415+
414416
this.streamedReplyCb(reply);
415417

416418
}).catch((reason: any) => {

src/mesh/service/remoting/MeshProxy.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class MeshProxy implements MeshInterface {
151151
window.clearTimeout(to);
152152
}
153153

154-
if (reply.state !== undefined) {
154+
if (reply.errorType === undefined && reply.error === undefined) {
155155

156156
if (cb !== undefined) {
157157
cb(reply.state);
@@ -689,6 +689,7 @@ class MeshProxy implements MeshInterface {
689689
}
690690

691691
removeSyncObserver(obs: SyncObserver, mut: MutableObject, peerGroupId?: string | undefined, timeout=10000): Promise<void> {
692+
692693
const p = new Promise<void>((resolve: () => void, reject: (reason: any) => void) => {
693694
const observerId = this.syncObserverIds.get(obs);
694695

src/net/transport/WebRTCConnection.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ class WebRTCConnection implements Connection {
3939

4040
onmessage : (ev: MessageEvent) => void;
4141
onready : () => void;
42-
channelStatusChangeCallback : ((status: string, conn: Connection) => void) | undefined;
42+
channelStatusChangeCallback : ((channelStatus: string, connectionStatus: string, conn: Connection) => void) | undefined;
4343

4444
remoteInstanceId?: string;
4545

4646
private handleSignallingMessage : MessageCallback;
4747

4848
startup = Date.now();
4949

50-
constructor(linkupManager: LinkupManager, local: LinkupAddress, remote: LinkupAddress, callId: string, readyCallback : (conn: Connection) => void, channelStatusChangeCallback?: (status: string, conn: Connection) => void) {
50+
constructor(linkupManager: LinkupManager, local: LinkupAddress, remote: LinkupAddress, callId: string, readyCallback : (conn: Connection) => void, channelStatusChangeCallback?: (channelStatus: string, connectionStatus: string, conn: Connection) => void) {
5151

5252
this.linkupManager = linkupManager;
5353
this.localAddress = local;
@@ -274,7 +274,7 @@ class WebRTCConnection implements Connection {
274274
WebRTCConnection.logger.debug(this.callId + ' connectionState now is ' + this?.connection?.connectionState);
275275

276276
if (this.channelStatusChangeCallback !== undefined) {
277-
this.channelStatusChangeCallback(this.channel?.readyState || 'unknown', this);
277+
this.channelStatusChangeCallback(this.channel?.readyState || 'unknown', this.connection?.connectionState || 'unknown', this);
278278
}
279279
};
280280

@@ -380,7 +380,7 @@ class WebRTCConnection implements Connection {
380380
};
381381

382382
if (this.channelStatusChangeCallback !== undefined) {
383-
this.channelStatusChangeCallback(this.channel?.readyState || 'unknown', this);
383+
this.channelStatusChangeCallback(this.channel?.readyState || 'unknown', this.connection?.connectionState || 'unknown', this);
384384
}
385385
};
386386

0 commit comments

Comments
 (0)