1+ using System ;
2+ using System . Diagnostics ;
3+ using System . Net . WebSockets ;
4+ using System . Text ;
5+ using System . Threading ;
6+ using System . Threading . Tasks ;
7+ using Parse . Abstractions . Infrastructure . Execution ;
8+
9+ namespace Parse . Infrastructure . Execution ;
10+
11+ /// <summary>
12+ /// Represents a WebSocket client that allows connecting to a WebSocket server, sending messages, and receiving messages.
13+ /// Implements the <c>IWebSocketClient</c> interface for WebSocket operations.
14+ /// </summary>
15+ class TextWebSocketClient : IWebSocketClient
16+ {
17+ /// <summary>
18+ /// A private instance of the ClientWebSocket class used to manage the WebSocket connection.
19+ /// This variable is responsible for handling the low-level WebSocket communication, including
20+ /// connecting, sending, and receiving data from the WebSocket server. It is initialized
21+ /// when establishing a connection and is used internally for operations such as sending messages
22+ /// and listening for incoming data.
23+ /// </summary>
24+ private ClientWebSocket _webSocket ;
25+
26+ /// <summary>
27+ /// A private instance of the Task class representing the background operation
28+ /// responsible for continuously listening for incoming WebSocket messages.
29+ /// This task is used to manage the asynchronous listening process, ensuring that
30+ /// messages are received from the WebSocket server without blocking the main thread.
31+ /// It is initialized when the listening process starts and monitored to prevent
32+ /// multiple concurrent listeners from being created.
33+ /// </summary>
34+ private Task _listeningTask ;
35+
36+ /// <summary>
37+ /// An event triggered whenever a message is received from the WebSocket server.
38+ /// This event is used to notify subscribers with the content of the received message,
39+ /// represented as a string. Handlers for this event can process or respond to the message
40+ /// based on the application's requirements.
41+ /// </summary>
42+ public event EventHandler < string > MessageReceived ;
43+
44+ /// <summary>
45+ /// Opens a WebSocket connection to the specified server URI and starts listening for messages.
46+ /// If the connection is already open or in a connecting state, this method does nothing.
47+ /// </summary>
48+ /// <param name="serverUri">The URI of the WebSocket server to connect to.</param>
49+ /// <param name="cancellationToken">A cancellation token that can be used to cancel the connect operation.</param>
50+ /// <returns>
51+ /// A task representing the asynchronous operation of connecting to the WebSocket server.
52+ /// </returns>
53+ public async Task OpenAsync ( string serverUri , CancellationToken cancellationToken = default )
54+ {
55+ _webSocket ??= new ClientWebSocket ( ) ;
56+
57+ if ( _webSocket . State != WebSocketState . Open && _webSocket . State != WebSocketState . Connecting )
58+ {
59+ await _webSocket . ConnectAsync ( new Uri ( serverUri ) , cancellationToken ) ;
60+ StartListening ( cancellationToken ) ;
61+ }
62+ }
63+
64+ /// <summary>
65+ /// Closes the WebSocket connection gracefully with a normal closure status.
66+ /// Ensures that the WebSocket connection is properly terminated and resources are released.
67+ /// </summary>
68+ /// <param name="cancellationToken">A cancellation token that can be used to cancel the close operation.</param>
69+ /// <returns>
70+ /// A task representing the asynchronous operation of closing the WebSocket connection.
71+ /// </returns>
72+ public async Task CloseAsync ( CancellationToken cancellationToken = default )
73+ => await _webSocket . CloseAsync ( WebSocketCloseStatus . NormalClosure , String . Empty , cancellationToken ) ;
74+
75+ private async Task ListenForMessages ( CancellationToken cancellationToken )
76+ {
77+ byte [ ] buffer = new byte [ 1024 * 4 ] ;
78+
79+ try
80+ {
81+ while ( ! cancellationToken . IsCancellationRequested &&
82+ _webSocket . State == WebSocketState . Open )
83+ {
84+ WebSocketReceiveResult result = await _webSocket . ReceiveAsync (
85+ new ArraySegment < byte > ( buffer ) ,
86+ cancellationToken ) ;
87+
88+ if ( result . MessageType == WebSocketMessageType . Close )
89+ {
90+ await CloseAsync ( cancellationToken ) ;
91+ break ;
92+ }
93+
94+ string message = Encoding . UTF8 . GetString ( buffer , 0 , result . Count ) ;
95+ MessageReceived ? . Invoke ( this , message ) ;
96+ }
97+ }
98+ catch ( OperationCanceledException ex )
99+ {
100+ // Normal cancellation, no need to handle
101+ Debug . WriteLine ( $ "ClientWebsocket connection was closed: { ex . Message } ") ;
102+ }
103+ }
104+
105+
106+ /// <summary>
107+ /// Starts listening for incoming messages from the WebSocket connection. This method ensures that only one listener task is running at a time.
108+ /// </summary>
109+ /// <param name="cancellationToken">A cancellation token to signal the listener task to stop.</param>
110+ private void StartListening ( CancellationToken cancellationToken )
111+ {
112+ // Make sure we don't start multiple listeners
113+ if ( _listeningTask is { IsCompleted : false } )
114+ {
115+ return ;
116+ }
117+
118+ // Start the listener task
119+ _listeningTask = Task . Run ( async ( ) =>
120+ {
121+ if ( cancellationToken . IsCancellationRequested )
122+ {
123+ cancellationToken . ThrowIfCancellationRequested ( ) ;
124+ }
125+
126+ await ListenForMessages ( cancellationToken ) ;
127+ } , cancellationToken ) ;
128+ }
129+
130+ /// <summary>
131+ /// Sends a text message to the connected WebSocket server asynchronously.
132+ /// The message is encoded in UTF-8 format before being sent.
133+ /// </summary>
134+ /// <param name="message">The message to be sent to the WebSocket server.</param>
135+ /// <param name="cancellationToken">A cancellation token that can be used to cancel the send operation.</param>
136+ /// <returns>
137+ /// A task representing the asynchronous operation of sending the message to the WebSocket server.
138+ /// </returns>
139+ public async Task SendAsync ( string message , CancellationToken cancellationToken = default )
140+ {
141+ if ( _webSocket is not null && _webSocket . State == WebSocketState . Open )
142+ await _webSocket . SendAsync ( Encoding . UTF8 . GetBytes ( message ) , WebSocketMessageType . Text , true , cancellationToken ) ;
143+ }
144+ }
0 commit comments