1+ import { Hash , HashedObject , MutableObject } from 'data/model' ;
2+ import { Agent } from 'mesh/service' ;
3+ import { AgentId } from 'mesh/service/Agent' ;
4+ import { AgentPod , AgentEvent , AgentPodEventType , AgentSetChangeEvent , AgentSetChange } from 'mesh/service/AgentPod' ;
5+ import { Event , EventRelay , Observer } from 'util/events' ;
6+ import { Endpoint } from '../network' ;
7+ import { StateSyncAgent } from './StateSyncAgent' ;
8+
9+ /*
10+ * Sync Observers
11+ * ==============
12+ *
13+ * All sync agents have an EventRelay that sends an event whenver the sync state changes
14+ * (new mutation ops are discovered and need to be fetched, etc.). Users of the mesh can
15+ * observe the sync state by using the methods addSyncObserver / removeSyncObserver.
16+ *
17+ * Note: we want that if an observer is added, and the object stops being synchronized
18+ * (and the sync agent is stopped), but later sync is resumed, this works transparently
19+ * for any observers that have been added. This is accomplished by having the observers
20+ * at the mesh level, and chaining an observer from the sync agent when one becomes
21+ * available. The sequence then looks like this:
22+ *
23+ * - syncObjectWithPeerGroup() is called
24+ * - addSyncObserver() is called -> ... to be continued
25+ */
26+
27+ type SyncObserver = Observer < HashedObject > ;
28+ type SyncEvent = Event < HashedObject , SyncState > ;
29+
30+ type SyncState = { remoteStateHashes : { [ key : Endpoint ] : Hash } , localStateHash ?: Hash , inSync : boolean , opsToFetch : number , synchronizing : boolean }
31+
32+ enum SyncObserverEventTypes {
33+ SyncStateUpdate = 'sync-state-update'
34+ } ;
35+
36+ type SyncStateUpdateEvent = { emitter : HashedObject , action : SyncObserverEventTypes . SyncStateUpdate , data : SyncState } ;
37+
38+ type PeerGroupId = string ;
39+
40+ class SyncObserverAgent implements Agent {
41+
42+ static AgentId = 'sync-observer-agent' ;
43+
44+ pod ?: AgentPod ;
45+
46+ relays : Map < AgentId , EventRelay < HashedObject > > ;
47+
48+ constructor ( ) {
49+ this . relays = new Map ( ) ;
50+ }
51+
52+ getAgentId ( ) : string {
53+ return SyncObserverAgent . AgentId ;
54+ }
55+
56+ ready ( pod : AgentPod ) : void {
57+ this . pod = pod ;
58+ }
59+
60+ addSyncObserver ( obs : SyncObserver , mut : MutableObject , peerGroupId : PeerGroupId ) {
61+
62+ if ( this . pod === undefined ) {
63+ throw new Error ( 'Trying to add a sync observer, but the SyncObserverAgent is not ready.' ) ;
64+ }
65+
66+ const syncAgentId = mut . getSyncAgentId ( peerGroupId ) ;
67+
68+ let relay = this . relays . get ( syncAgentId ) ;
69+
70+ if ( relay === undefined ) {
71+ const upstream = new Map < string , EventRelay < HashedObject > > ( ) ;
72+
73+ const syncAgent = this . pod . getAgent ( syncAgentId ) as StateSyncAgent ;
74+ if ( syncAgent !== undefined ) {
75+ upstream . set ( 'agent' , syncAgent . getSyncEventSource ( ) ) ;
76+ }
77+
78+ relay = new EventRelay < HashedObject > ( mut , upstream ) ;
79+ this . relays . set ( syncAgentId , relay ) ;
80+ }
81+
82+ relay . addObserver ( obs ) ;
83+ }
84+
85+ removeSyncObserver ( obs : SyncObserver , mut : MutableObject , peerGroupId : PeerGroupId ) {
86+ if ( this . pod === undefined ) {
87+ throw new Error ( 'Trying to remove a sync observer, but the SyncObserverAgent is not ready.' ) ;
88+ }
89+
90+ const syncAgentId = mut . getSyncAgentId ( peerGroupId ) ;
91+ let relay = this . relays . get ( syncAgentId ) ;
92+
93+ if ( relay !== undefined ) {
94+ relay . removeObserver ( obs ) ;
95+
96+ if ( relay . observers . size === 0 ) {
97+ relay . removeAllUpstreamRelays ( ) ;
98+ this . relays . delete ( syncAgentId ) ;
99+ }
100+ }
101+ }
102+
103+ receiveLocalEvent ( ev : AgentEvent ) : void {
104+
105+ if ( ev . type === AgentPodEventType . AgentSetChange ) {
106+ const agentEv = ev as AgentSetChangeEvent ;
107+
108+ const syncAgentId = agentEv . content . agentId ;
109+ const relay = this . relays . get ( syncAgentId ) ;
110+
111+ if ( relay !== undefined ) {
112+ if ( agentEv . content . change === AgentSetChange . Addition ) {
113+ const syncAgent = this . pod ?. getAgent ( syncAgentId ) as StateSyncAgent ;
114+ relay . addUpstreamRelay ( 'agent' , syncAgent . getSyncEventSource ( ) ) ;
115+ } else if ( agentEv . content . change === AgentSetChange . Removal ) {
116+ relay . removeUpstreamRelay ( 'agent' ) ;
117+ }
118+
119+ }
120+ }
121+ }
122+
123+ shutdown ( ) : void {
124+
125+ }
126+ }
127+
128+ export { SyncObserverAgent , SyncObserverEventTypes } ;
129+ export type { SyncState , SyncObserver , SyncEvent , SyncStateUpdateEvent } ;
0 commit comments