Skip to content

Commit 871f015

Browse files
committed
Replaced "int TimeOut" by "TimeSpan Timeout" and other improvements
1 parent 6a50ce4 commit 871f015

13 files changed

+107
-57
lines changed

Parse/Abstractions/Infrastructure/ILiveQueryServerConnectionData.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
namespace Parse.Abstractions.Infrastructure;
1+
using System;
2+
3+
namespace Parse.Abstractions.Infrastructure;
24

35
public interface ILiveQueryServerConnectionData : IServerConnectionData
46
{
57
/// <summary>
68
/// Represents the default timeout duration, in milliseconds.
79
/// </summary>
8-
public const int DefaultTimeOut = 5000; // 5 seconds
10+
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(5);
911

1012
/// <summary>
1113
/// The timeout duration, in milliseconds, used for various operations, such as
1214
/// establishing a connection or completing a subscription.
1315
/// </summary>
14-
int TimeOut { get; set; }
16+
TimeSpan Timeout { get; set; }
1517

1618
/// <summary>
1719
/// The default buffer size, in bytes.

Parse/Infrastructure/Execution/MessageReceivedEventArgs.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ public class MessageReceivedEventArgs(string message) : EventArgs
1010
/// <summary>
1111
/// Gets the message content that was received.
1212
/// </summary>
13-
public string Message { get; } = message;
13+
public string Message { get; } = message ?? throw new ArgumentNullException(nameof(message));
1414
}

Parse/Infrastructure/Execution/TextWebSocketClient.cs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Parse.Infrastructure.Execution;
1313
/// Represents a WebSocket client that allows connecting to a WebSocket server, sending messages, and receiving messages.
1414
/// Implements the <c>IWebSocketClient</c> interface for WebSocket operations.
1515
/// </summary>
16-
class TextWebSocketClient(int bufferSize) : IWebSocketClient
16+
class TextWebSocketClient(int bufferSize) : IWebSocketClient, IDisposable
1717
{
1818
/// <summary>
1919
/// A private instance of the ClientWebSocket class used to manage the WebSocket connection.
@@ -68,7 +68,7 @@ public async Task OpenAsync(string serverUri, CancellationToken cancellationToke
6868
webSocketToConnect = webSocket;
6969
}
7070
}
71-
71+
listeningCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
7272
if (webSocketToConnect is not null)
7373
{
7474
await webSocketToConnect.ConnectAsync(new Uri(serverUri), cancellationToken);
@@ -86,10 +86,7 @@ public async Task OpenAsync(string serverUri, CancellationToken cancellationToke
8686
/// </returns>
8787
public async Task CloseAsync(CancellationToken cancellationToken = default)
8888
{
89-
if (webSocket is not null)
90-
{
91-
await webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty, cancellationToken)!;
92-
}
89+
await webSocket?.CloseAsync(WebSocketCloseStatus.NormalClosure, String.Empty, cancellationToken)!;
9390
}
9491

9592
private async Task ListenForMessages(CancellationToken cancellationToken)
@@ -107,7 +104,6 @@ private async Task ListenForMessages(CancellationToken cancellationToken)
107104

108105
if (result.MessageType == WebSocketMessageType.Close)
109106
{
110-
await CloseAsync(cancellationToken);
111107
break;
112108
}
113109

@@ -118,17 +114,17 @@ private async Task ListenForMessages(CancellationToken cancellationToken)
118114
}
119115
else
120116
{
121-
// Handle partial messages by accumulating data until EndOfMessage is true
122-
StringBuilder messageBuilder = new StringBuilder();
123-
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
117+
// Accumulate bytes to handle UTF-8 characters split across boundaries
118+
using var messageStream = new MemoryStream();
119+
messageStream.Write(buffer, 0, result.Count);
124120
while (!result.EndOfMessage)
125121
{
126122
result = await webSocket.ReceiveAsync(
127123
new ArraySegment<byte>(buffer),
128124
cancellationToken);
129-
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
125+
messageStream.Write(buffer, 0, result.Count);
130126
}
131-
string fullMessage = messageBuilder.ToString();
127+
string fullMessage = Encoding.UTF8.GetString(messageStream.ToArray());
132128
MessageReceived?.Invoke(this, new MessageReceivedEventArgs(fullMessage));
133129
}
134130
}
@@ -206,4 +202,33 @@ public async Task SendAsync(string message, CancellationToken cancellationToken
206202
}
207203
await webSocket.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, cancellationToken);
208204
}
205+
206+
private CancellationTokenSource listeningCts;
207+
private bool disposed;
208+
209+
public void Dispose()
210+
{
211+
Dispose(true);
212+
GC.SuppressFinalize(this);
213+
}
214+
215+
protected virtual void Dispose(bool disposing)
216+
{
217+
if (!disposed)
218+
{
219+
if (disposing)
220+
{
221+
listeningCts?.Cancel();
222+
try
223+
{
224+
listeningTask?.Wait(TimeSpan.FromSeconds(5));
225+
}
226+
catch { }
227+
228+
webSocket?.Dispose();
229+
listeningCts?.Dispose();
230+
}
231+
disposed = true;
232+
}
233+
}
209234
}

Parse/Infrastructure/LateInitializedMutableServiceHub.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public IWebSocketClient WebSocketClient
171171

172172
public IParseLiveQueryController LiveQueryController
173173
{
174-
get => LateInitializer.GetValue<IParseLiveQueryController>(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder));
174+
get => LateInitializer.GetValue<IParseLiveQueryController>(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.Timeout, WebSocketClient, Decoder));
175175
set => LateInitializer.SetValue(value);
176176
}
177177

Parse/Infrastructure/LiveQueryServerConnectionData.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using Parse.Abstractions.Infrastructure;
34

@@ -16,7 +17,7 @@ public LiveQueryServerConnectionData() { }
1617
/// The timeout duration, in milliseconds, used for various operations, such as
1718
/// establishing a connection or completing a subscription.
1819
/// </summary>
19-
public int TimeOut { get; set; } = ILiveQueryServerConnectionData.DefaultTimeOut;
20+
public TimeSpan Timeout { get; set; } = ILiveQueryServerConnectionData.DefaultTimeout;
2021

2122
/// <summary>
2223
/// The buffer size, in bytes, used by the WebSocket client for communication operations.

Parse/Infrastructure/MutableServiceHub.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,11 @@ public MutableServiceHub SetDefaults(IServerConnectionData connectionData = defa
109109
PushChannelsController ??= new ParsePushChannelsController(CurrentInstallationController);
110110
InstallationDataFinalizer ??= new ParseInstallationDataFinalizer { };
111111

112-
WebSocketClient ??= LiveQueryServerConnectionData is null ? null : new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize);
113-
LiveQueryController ??= LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder);
112+
if (LiveQueryServerConnectionData is not null)
113+
{
114+
WebSocketClient ??= new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize);
115+
LiveQueryController ??= new ParseLiveQueryController(LiveQueryServerConnectionData.Timeout, WebSocketClient, Decoder);
116+
}
114117

115118
return this;
116119
}

Parse/Infrastructure/ServiceHub.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class ServiceHub : IServiceHub
7272
public IParseInstallationDataFinalizer InstallationDataFinalizer => LateInitializer.GetValue(() => new ParseInstallationDataFinalizer { });
7373

7474
public IWebSocketClient WebSocketClient => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new TextWebSocketClient(LiveQueryServerConnectionData.MessageBufferSize));
75-
public IParseLiveQueryController LiveQueryController => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.TimeOut, WebSocketClient, Decoder));
75+
public IParseLiveQueryController LiveQueryController => LateInitializer.GetValue(() => LiveQueryServerConnectionData is null ? null : new ParseLiveQueryController(LiveQueryServerConnectionData.Timeout, WebSocketClient, Decoder));
7676

7777
public bool Reset()
7878
{

Parse/Platform/LiveQueries/ParseLiveQuery.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ private ParseLiveQuery(ParseLiveQuery<T> source, IEnumerable<string> watchedKeys
8787
/// </summary>
8888
/// <param name="watch">The key that should be watched.</param>
8989
/// <returns>A new query with the additional constraint.</returns>
90-
public ParseLiveQuery<T> Watch(string watch) => new(this, new List<string> { watch });
90+
public ParseLiveQuery<T> Watch(string watch)
91+
{
92+
ArgumentException.ThrowIfNullOrWhiteSpace(watch);
93+
return new ParseLiveQuery<T>(this, [ watch ]);
94+
}
9195

9296
internal IDictionary<string, object> BuildParameters()
9397
{

Parse/Platform/LiveQueries/ParseLiveQueryController.cs

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ParseLiveQueryController : IParseLiveQueryController, IDisposable,
4343
/// - Unsubscribing from a query.
4444
/// Ensure that the value is configured appropriately to avoid premature timeout errors in network-dependent processes.
4545
/// </remarks>
46-
private int TimeOut { get; }
46+
private TimeSpan Timeout { get; }
4747

4848
/// <summary>
4949
/// Event triggered when an error occurs during the operation of the ParseLiveQueryController.
@@ -110,19 +110,19 @@ public enum ParseLiveQueryState
110110
/// <summary>
111111
/// Initializes a new instance of the <see cref="ParseLiveQueryController"/> class.
112112
/// </summary>
113-
/// <param name="timeOut"></param>
113+
/// <param name="timeout"></param>
114114
/// <param name="webSocketClient">
115115
/// The <see cref="IWebSocketClient"/> implementation to use for the live query connection.
116116
/// </param>
117117
/// <param name="decoder"></param>
118118
/// <remarks>
119119
/// This constructor is used to initialize a new instance of the <see cref="ParseLiveQueryController"/> class
120120
/// </remarks>
121-
public ParseLiveQueryController(int timeOut, IWebSocketClient webSocketClient, IParseDataDecoder decoder)
121+
public ParseLiveQueryController(TimeSpan timeout, IWebSocketClient webSocketClient, IParseDataDecoder decoder)
122122
{
123123
WebSocketClient = webSocketClient ?? throw new ArgumentNullException(nameof(webSocketClient));
124124
Decoder = decoder ?? throw new ArgumentNullException(nameof(decoder));
125-
TimeOut = timeOut;
125+
Timeout = timeout;
126126
_state = ParseLiveQueryState.Closed;
127127
}
128128

@@ -360,17 +360,24 @@ private async Task OpenAsync(CancellationToken cancellationToken = default)
360360

361361
private void WebSocketClientOnMessageReceived(object sender, MessageReceivedEventArgs args)
362362
{
363-
object parsed = JsonUtilities.Parse(args.Message);
364-
if (parsed is IDictionary<string, object> message)
363+
try
365364
{
366-
ProcessMessage(message);
365+
object parsed = JsonUtilities.Parse(args.Message);
366+
if (parsed is IDictionary<string, object> message)
367+
{
368+
ProcessMessage(message);
369+
}
370+
else
371+
{
372+
Debug.WriteLine($"Invalid message format received: {args.Message}");
373+
}
367374
}
368-
else
375+
catch (ArgumentException ex)
369376
{
370-
Debug.WriteLine($"Invalid message format received: {args.Message}");
377+
Debug.WriteLine($"Error parsing message: {ex.Message}");
378+
Error?.Invoke(this, new ParseLiveQueryErrorEventArgs(31, $"Failed to parse message: {ex.Message}", true, ex));
371379
}
372380
}
373-
374381
/// <summary>
375382
/// Establishes a connection to the live query server asynchronously.
376383
/// </summary>
@@ -385,6 +392,7 @@ private void WebSocketClientOnMessageReceived(object sender, MessageReceivedEven
385392
/// </exception>
386393
public async Task ConnectAsync(CancellationToken cancellationToken = default)
387394
{
395+
ThrowIfDisposed();
388396
if (_state == ParseLiveQueryState.Closed)
389397
{
390398
_state = ParseLiveQueryState.Connecting;
@@ -405,10 +413,9 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default)
405413
await SendMessage(await AppendSessionToken(message), cancellationToken);
406414

407415
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
408-
cts.CancelAfter(TimeOut);
416+
cts.CancelAfter(Timeout);
409417

410418
await ConnectionSignal.Task.WaitAsync(cts.Token);
411-
_state = ParseLiveQueryState.Connected;
412419
}
413420
catch (OperationCanceledException)
414421
{
@@ -458,7 +465,7 @@ private async Task SendAndWaitForSignalAsync(IDictionary<string, object> message
458465
await SendMessage(message, cancellationToken);
459466

460467
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
461-
cts.CancelAfter(TimeOut);
468+
cts.CancelAfter(Timeout);
462469

463470
await tcs.Task.WaitAsync(cts.Token);
464471
}
@@ -496,6 +503,7 @@ private async Task SendAndWaitForSignalAsync(IDictionary<string, object> message
496503
/// </exception>
497504
public async Task<IParseLiveQuerySubscription> SubscribeAsync<T>(ParseLiveQuery<T> liveQuery, CancellationToken cancellationToken = default) where T : ParseObject
498505
{
506+
ThrowIfDisposed();
499507
if (_state == ParseLiveQueryState.Closed)
500508
{
501509
throw new InvalidOperationException("Cannot subscribe to a live query when the connection is closed.");
@@ -535,6 +543,7 @@ public async Task<IParseLiveQuerySubscription> SubscribeAsync<T>(ParseLiveQuery<
535543
/// </returns>
536544
public async Task UpdateSubscriptionAsync<T>(ParseLiveQuery<T> liveQuery, int requestId, CancellationToken cancellationToken = default) where T : ParseObject
537545
{
546+
ThrowIfDisposed();
538547
Dictionary<string, object> message = new Dictionary<string, object>
539548
{
540549
{ "op", "update" },
@@ -561,6 +570,7 @@ public async Task UpdateSubscriptionAsync<T>(ParseLiveQuery<T> liveQuery, int re
561570
/// </exception>
562571
public async Task UnsubscribeAsync(int requestId, CancellationToken cancellationToken = default)
563572
{
573+
ThrowIfDisposed();
564574
Dictionary<string, object> message = new Dictionary<string, object>
565575
{
566576
{ "op", "unsubscribe" },
@@ -581,6 +591,7 @@ public async Task UnsubscribeAsync(int requestId, CancellationToken cancellation
581591
/// </returns>
582592
public async Task CloseAsync(CancellationToken cancellationToken = default)
583593
{
594+
ThrowIfDisposed();
584595
WebSocketClient.MessageReceived -= WebSocketClientOnMessageReceived;
585596
WebSocketClient.WebsocketError -= WebSocketClientOnWebsocketError;
586597
WebSocketClient.UnknownError -= WebSocketClientOnUnknownError;
@@ -635,18 +646,17 @@ private void Dispose(bool disposing)
635646
if (disposing)
636647
{
637648
// For sync disposal, the best effort cleanup without waiting
638-
_ = Task.Run(async () =>
649+
try
650+
{
651+
CloseAsync().GetAwaiter().GetResult();
652+
}
653+
catch (Exception ex)
639654
{
640-
try
641-
{
642-
await CloseAsync();
643-
}
644-
catch (Exception ex)
645-
{
646-
Debug.WriteLine($"Error during disposal: {ex}");
647-
}
648-
});
655+
Debug.WriteLine($"Error during disposal: {ex}");
656+
}
649657
}
650658
disposed = true;
651659
}
660+
661+
private void ThrowIfDisposed() => ObjectDisposedException.ThrowIf(disposed, nameof(ParseLiveQueryController));
652662
}

Parse/Platform/LiveQueries/ParseLiveQueryErrorEventArgs.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Parse.Platform.LiveQueries;
88
public class ParseLiveQueryErrorEventArgs : EventArgs
99
{
1010
/// <summary>
11-
/// Gets or sets the error message associated with a live query operation.
11+
/// Gets the error message associated with a live query operation.
1212
/// </summary>
1313
/// <remarks>
1414
/// The <see cref="Error"/> property contains a description of the error that occurred during
@@ -18,7 +18,7 @@ public class ParseLiveQueryErrorEventArgs : EventArgs
1818
public string Error { get; }
1919

2020
/// <summary>
21-
/// Gets or sets the error code associated with a live query operation.
21+
/// Gets the error code associated with a live query operation.
2222
/// </summary>
2323
/// <remarks>
2424
/// The <see cref="Code"/> property contains a numerical identifier that represents
@@ -28,7 +28,7 @@ public class ParseLiveQueryErrorEventArgs : EventArgs
2828
public int Code { get; }
2929

3030
/// <summary>
31-
/// Gets or sets a value indicating whether the client should attempt to reconnect
31+
/// Gets a value indicating whether the client should attempt to reconnect
3232
/// after an error occurs during a live query operation.
3333
/// </summary>
3434
/// <remarks>
@@ -50,10 +50,13 @@ public class ParseLiveQueryErrorEventArgs : EventArgs
5050
public Exception LocalException { get; }
5151

5252
/// <summary>
53-
/// Represents the arguments for an error event that occurs during a live query in the Parse platform.
53+
/// Initializes a new instance of the <see cref="ParseLiveQueryErrorEventArgs"/> class.
5454
/// </summary>
55-
internal ParseLiveQueryErrorEventArgs(int code, string error, bool reconnect, Exception localException = null)
56-
{
55+
/// <param name="code">The error code associated with the live query operation.</param>
56+
/// <param name="error">The error message associated with the live query operation.</param>
57+
/// <param name="reconnect">A value indicating whether the client should attempt to reconnect.</param>
58+
/// <param name="localException">The local exception that occurred, if any.</param>
59+
internal ParseLiveQueryErrorEventArgs(int code, string error, bool reconnect, Exception localException = null) {
5760
Error = error;
5861
Code = code;
5962
Reconnect = reconnect;

0 commit comments

Comments
 (0)