44using System . Linq ;
55using System . Threading ;
66using System . Threading . Tasks ;
7+ using Parse . Abstractions . Infrastructure . Data ;
78using Parse . Abstractions . Infrastructure . Execution ;
89using Parse . Abstractions . Platform . LiveQueries ;
10+ using Parse . Infrastructure . Data ;
911using Parse . Infrastructure . Utilities ;
1012
1113namespace Parse . Platform . LiveQueries ;
@@ -16,6 +18,7 @@ namespace Parse.Platform.LiveQueries;
1618/// </summary>
1719public class ParseLiveQueryController : IParseLiveQueryController
1820{
21+ private IParseDataDecoder Decoder { get ; }
1922 private IWebSocketClient WebSocketClient { get ; }
2023
2124 private int LastRequestId { get ; set ; } = 0 ;
@@ -62,7 +65,19 @@ public enum ParseLiveQueryState
6265 /// has been terminated, and no data updates are being received.
6366 /// </summary>
6467 Closed ,
68+
69+ /// <summary>
70+ /// Represents the state where the live query connection is in the process of being established.
71+ /// This indicates that the client is actively attempting to connect to the live query server,
72+ /// but the connection has not yet been fully established.
73+ /// </summary>
6574 Connecting ,
75+
76+ /// <summary>
77+ /// Represents the state where the live query connection has been successfully established.
78+ /// This state indicates that the client is actively connected to the Parse LiveQuery server
79+ /// and is receiving real-time data updates.
80+ /// </summary>
6681 Connected
6782 }
6883
@@ -79,34 +94,33 @@ public enum ParseLiveQueryState
7994 /// </remarks>
8095 public ParseLiveQueryState State { get ; private set ; }
8196
82- HashSet < int > SubscriptionIds { get ; } = new HashSet < int > { } ;
83-
8497 CancellationTokenSource ConnectionSignal { get ; set ; }
8598 private IDictionary < int , CancellationTokenSource > SubscriptionSignals { get ; } = new Dictionary < int , CancellationTokenSource > { } ;
8699 private IDictionary < int , CancellationTokenSource > UnsubscriptionSignals { get ; } = new Dictionary < int , CancellationTokenSource > { } ;
87- private IDictionary < int , CancellationTokenSource > SubscriptionUpdateSignals { get ; } = new Dictionary < int , CancellationTokenSource > { } ;
88-
89- private IDictionary < int , ParseLiveQuerySubscription > Subscriptions { get ; set ; } = new Dictionary < int , ParseLiveQuerySubscription > { } ;
100+ private IDictionary < int , IParseLiveQuerySubscription > Subscriptions { get ; set ; } = new Dictionary < int , IParseLiveQuerySubscription > { } ;
90101
91102 /// <summary>
92103 /// Initializes a new instance of the <see cref="ParseLiveQueryController"/> class.
93104 /// </summary>
94105 /// <param name="webSocketClient">
95106 /// The <see cref="IWebSocketClient"/> implementation to use for the live query connection.
96107 /// </param>
108+ /// <param name="decoder"></param>
97109 /// <remarks>
98110 /// This constructor is used to initialize a new instance of the <see cref="ParseLiveQueryController"/> class
99- public ParseLiveQueryController ( IWebSocketClient webSocketClient )
111+ /// </remarks>
112+ public ParseLiveQueryController ( IWebSocketClient webSocketClient , IParseDataDecoder decoder )
100113 {
101114 WebSocketClient = webSocketClient ;
102115 State = ParseLiveQueryState . Closed ;
116+ Decoder = decoder ;
103117 }
104118
105119 private void ProcessMessage ( IDictionary < string , object > message )
106120 {
107121 int requestId ;
108122 string clientId ;
109- ParseLiveQuerySubscription subscription ;
123+ IParseLiveQuerySubscription subscription ;
110124 switch ( message [ "op" ] as string )
111125 {
112126 case "connected" :
@@ -120,7 +134,6 @@ private void ProcessMessage(IDictionary<string, object> message)
120134 if ( clientId == ClientId )
121135 {
122136 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
123- SubscriptionIds . Add ( requestId ) ;
124137 if ( SubscriptionSignals . TryGetValue ( requestId , out CancellationTokenSource subscriptionSignal ) )
125138 {
126139 subscriptionSignal ? . Cancel ( ) ;
@@ -133,7 +146,6 @@ private void ProcessMessage(IDictionary<string, object> message)
133146 if ( clientId == ClientId )
134147 {
135148 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
136- SubscriptionIds . Remove ( requestId ) ;
137149 if ( UnsubscriptionSignals . TryGetValue ( requestId , out CancellationTokenSource unsubscriptionSignal ) )
138150 {
139151 unsubscriptionSignal ? . Cancel ( ) ;
@@ -156,8 +168,7 @@ private void ProcessMessage(IDictionary<string, object> message)
156168 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
157169 if ( Subscriptions . TryGetValue ( requestId , out subscription ) )
158170 {
159- ParseLiveQueryEventArgs args = new ParseLiveQueryEventArgs ( message [ "object" ] ) ;
160- subscription . OnCreate ( args ) ;
171+ subscription . OnCreate ( ParseObjectCoder . Instance . Decode ( message [ "object" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ) ;
161172 }
162173 }
163174 break ;
@@ -169,10 +180,9 @@ private void ProcessMessage(IDictionary<string, object> message)
169180 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
170181 if ( Subscriptions . TryGetValue ( requestId , out subscription ) )
171182 {
172- ParseLiveQueryEventArgs args = new ParseLiveQueryEventArgs (
173- message [ "object" ] ,
174- message [ "original" ] ) ;
175- subscription . OnEnter ( args ) ;
183+ subscription . OnEnter (
184+ ParseObjectCoder . Instance . Decode ( message [ "object" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ,
185+ ParseObjectCoder . Instance . Decode ( message [ "original" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ) ;
176186 }
177187 }
178188 break ;
@@ -184,10 +194,9 @@ private void ProcessMessage(IDictionary<string, object> message)
184194 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
185195 if ( Subscriptions . TryGetValue ( requestId , out subscription ) )
186196 {
187- ParseLiveQueryEventArgs args = new ParseLiveQueryEventArgs (
188- message [ "object" ] ,
189- message [ "original" ] ) ;
190- subscription . OnUpdate ( args ) ;
197+ subscription . OnUpdate (
198+ ParseObjectCoder . Instance . Decode ( message [ "object" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ,
199+ ParseObjectCoder . Instance . Decode ( message [ "original" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ) ;
191200 }
192201 }
193202 break ;
@@ -199,10 +208,9 @@ private void ProcessMessage(IDictionary<string, object> message)
199208 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
200209 if ( Subscriptions . TryGetValue ( requestId , out subscription ) )
201210 {
202- ParseLiveQueryEventArgs args = new ParseLiveQueryEventArgs (
203- message [ "object" ] ,
204- message [ "original" ] ) ;
205- subscription . OnLeave ( args ) ;
211+ subscription . OnLeave (
212+ ParseObjectCoder . Instance . Decode ( message [ "object" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ,
213+ ParseObjectCoder . Instance . Decode ( message [ "original" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ) ;
206214 }
207215 }
208216 break ;
@@ -214,8 +222,7 @@ private void ProcessMessage(IDictionary<string, object> message)
214222 requestId = Convert . ToInt32 ( message [ "requestId" ] ) ;
215223 if ( Subscriptions . TryGetValue ( requestId , out subscription ) )
216224 {
217- ParseLiveQueryEventArgs args = new ParseLiveQueryEventArgs ( message [ "object" ] ) ;
218- subscription . OnDelete ( args ) ;
225+ subscription . OnDelete ( ParseObjectCoder . Instance . Decode ( message [ "object" ] as IDictionary < string , object > , Decoder , ParseClient . Instance . Services ) ) ;
219226 }
220227 }
221228 break ;
@@ -351,7 +358,7 @@ public async Task<IParseLiveQuerySubscription> SubscribeAsync<T>(ParseLiveQuery<
351358 completionSignal . Dispose ( ) ;
352359 if ( signalReceived )
353360 {
354- ParseLiveQuerySubscription subscription = new ParseLiveQuerySubscription ( liveQuery . Services , requestId ) ;
361+ ParseLiveQuerySubscription < T > subscription = new ParseLiveQuerySubscription < T > ( liveQuery . Services , liveQuery . ClassName , requestId ) ;
355362 Subscriptions . Add ( requestId , subscription ) ;
356363 return subscription ;
357364 }
@@ -386,6 +393,15 @@ public async Task UpdateSubscriptionAsync<T>(ParseLiveQuery<T> liveQuery, int re
386393 { "query" , liveQuery . BuildParameters ( ) }
387394 } ;
388395 await SendMessage ( await AppendSessionToken ( message ) , cancellationToken ) ;
396+ CancellationTokenSource completionSignal = new CancellationTokenSource ( ) ;
397+ SubscriptionSignals . Add ( requestId , completionSignal ) ;
398+ bool signalReceived = completionSignal . Token . WaitHandle . WaitOne ( TimeOut ) ;
399+ SubscriptionSignals . Remove ( requestId ) ;
400+ completionSignal . Dispose ( ) ;
401+ if ( ! signalReceived )
402+ {
403+ throw new TimeoutException ( ) ;
404+ }
389405 }
390406
391407 /// <summary>
@@ -437,7 +453,6 @@ public async Task CloseAsync(CancellationToken cancellationToken = default)
437453 State = ParseLiveQueryState . Closed ;
438454 SubscriptionSignals . Clear ( ) ;
439455 UnsubscriptionSignals . Clear ( ) ;
440- SubscriptionUpdateSignals . Clear ( ) ;
441456 Subscriptions . Clear ( ) ;
442457 }
443458}
0 commit comments