Skip to content

Commit ed2e80e

Browse files
committed
Initially implement AsyncPooledSocket
1 parent 6b78cf5 commit ed2e80e

File tree

2 files changed

+563
-6
lines changed

2 files changed

+563
-6
lines changed
Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
//#define DEBUG_IO
2+
using System;
3+
using System.Linq;
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using System.IO;
7+
using System.Net;
8+
using System.Net.Sockets;
9+
using System.Text;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
using Microsoft.Extensions.Logging;
13+
using Dawn.Net.Sockets;
14+
using System.Runtime.InteropServices;
15+
using System.Reflection;
16+
using System.Runtime.CompilerServices;
17+
18+
namespace Enyim.Caching.Memcached
19+
{
20+
[DebuggerDisplay("[ Address: {endpoint}, IsAlive = {IsAlive} ]")]
21+
public partial class AsyncPooledSocket : IDisposable
22+
{
23+
private readonly ILogger _logger;
24+
private bool _isAlive;
25+
private Socket _socket;
26+
private Stream _inputStream;
27+
private AsyncSocketHelper _helper;
28+
29+
public AsyncPooledSocket(ILogger logger)
30+
{
31+
_logger = logger;
32+
_isAlive = true;
33+
}
34+
35+
private async Task CreateSocketAsync(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout)
36+
{
37+
CancellationTokenSource cancellationConnTimeout = null;
38+
if (connectionTimeout != TimeSpan.MaxValue)
39+
{
40+
cancellationConnTimeout = new CancellationTokenSource(connectionTimeout);
41+
}
42+
43+
var args = new ConnectEventArgs();
44+
args.RemoteEndPoint = endpoint;
45+
46+
if (Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, args))
47+
{
48+
if (cancellationConnTimeout != null)
49+
{
50+
using (cancellationConnTimeout.Token.Register(s => Socket.CancelConnectAsync((SocketAsyncEventArgs)s), args))
51+
{
52+
await args.Builder.Task.ConfigureAwait(false);
53+
}
54+
}
55+
}
56+
else if (args.SocketError != SocketError.Success)
57+
{
58+
throw new SocketException((int)args.SocketError);
59+
}
60+
61+
_socket = args.ConnectSocket;
62+
_socket.ReceiveTimeout = receiveTimeout == TimeSpan.MaxValue
63+
? Timeout.Infinite
64+
: (int)receiveTimeout.TotalMilliseconds;
65+
_socket.NoDelay = true;
66+
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
67+
68+
_inputStream = new NetworkStream(_socket, ownsSocket: true);
69+
}
70+
71+
//From https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs
72+
private sealed class ConnectEventArgs : SocketAsyncEventArgs
73+
{
74+
public AsyncTaskMethodBuilder Builder { get; private set; }
75+
public CancellationToken CancellationToken { get; private set; }
76+
77+
public void Initialize(CancellationToken cancellationToken)
78+
{
79+
CancellationToken = cancellationToken;
80+
var b = new AsyncTaskMethodBuilder();
81+
var ignored = b.Task;
82+
Builder = b;
83+
}
84+
85+
protected override void OnCompleted(SocketAsyncEventArgs _)
86+
{
87+
switch (SocketError)
88+
{
89+
case SocketError.Success:
90+
Builder.SetResult();
91+
break;
92+
93+
case SocketError.OperationAborted:
94+
case SocketError.ConnectionAborted:
95+
if (CancellationToken.IsCancellationRequested)
96+
{
97+
Builder.SetException(new TaskCanceledException());
98+
break;
99+
}
100+
goto default;
101+
102+
default:
103+
Builder.SetException(new SocketException((int)SocketError));
104+
break;
105+
}
106+
}
107+
}
108+
109+
public Action<AsyncPooledSocket> CleanupCallback { get; set; }
110+
111+
public int Available
112+
{
113+
get { return _socket.Available; }
114+
}
115+
116+
public void Reset()
117+
{
118+
// discard any buffered data
119+
_inputStream.Flush();
120+
121+
if (_helper != null) _helper.DiscardBuffer();
122+
123+
int available = _socket.Available;
124+
125+
if (available > 0)
126+
{
127+
if (_logger.IsEnabled(LogLevel.Warning))
128+
_logger.LogWarning("Socket bound to {0} has {1} unread data! This is probably a bug in the code. InstanceID was {2}.", _socket.RemoteEndPoint, available, InstanceId);
129+
130+
byte[] data = new byte[available];
131+
132+
Read(data, 0, available);
133+
134+
if (_logger.IsEnabled(LogLevel.Warning))
135+
_logger.LogWarning(Encoding.ASCII.GetString(data));
136+
}
137+
138+
if (_logger.IsEnabled(LogLevel.Debug))
139+
_logger.LogDebug("Socket {0} was reset", InstanceId);
140+
}
141+
142+
/// <summary>
143+
/// The ID of this instance. Used by the <see cref="T:MemcachedServer"/> to identify the instance in its inner lists.
144+
/// </summary>
145+
public readonly Guid InstanceId = Guid.NewGuid();
146+
147+
public bool IsAlive
148+
{
149+
get { return _isAlive; }
150+
}
151+
152+
/// <summary>
153+
/// Releases all resources used by this instance and shuts down the inner <see cref="T:Socket"/>. This instance will not be usable anymore.
154+
/// </summary>
155+
/// <remarks>Use the IDisposable.Dispose method if you want to release this instance back into the pool.</remarks>
156+
public void Destroy()
157+
{
158+
Dispose(true);
159+
}
160+
161+
~AsyncPooledSocket()
162+
{
163+
try { Dispose(true); }
164+
catch { }
165+
}
166+
167+
protected void Dispose(bool disposing)
168+
{
169+
if (disposing)
170+
{
171+
GC.SuppressFinalize(this);
172+
173+
try
174+
{
175+
if (_socket != null)
176+
try { _socket.Dispose(); }
177+
catch { }
178+
179+
if (_inputStream != null)
180+
_inputStream.Dispose();
181+
182+
_inputStream = null;
183+
_socket = null;
184+
CleanupCallback = null;
185+
}
186+
catch (Exception e)
187+
{
188+
_logger.LogError(nameof(PooledSocket), e);
189+
}
190+
}
191+
else
192+
{
193+
Action<AsyncPooledSocket> cc = CleanupCallback;
194+
195+
if (cc != null)
196+
cc(this);
197+
}
198+
}
199+
200+
void IDisposable.Dispose()
201+
{
202+
Dispose(false);
203+
}
204+
205+
private void CheckDisposed()
206+
{
207+
if (_socket == null)
208+
throw new ObjectDisposedException("PooledSocket");
209+
}
210+
211+
/// <summary>
212+
/// Reads the next byte from the server's response.
213+
/// </summary>
214+
/// <remarks>This method blocks and will not return until the value is read.</remarks>
215+
public int ReadByte()
216+
{
217+
CheckDisposed();
218+
219+
try
220+
{
221+
return _inputStream.ReadByte();
222+
}
223+
catch (IOException)
224+
{
225+
_isAlive = false;
226+
227+
throw;
228+
}
229+
}
230+
231+
public async Task<byte[]> ReadBytesAsync(int count)
232+
{
233+
using (var awaitable = new SocketAwaitable())
234+
{
235+
awaitable.Buffer = new ArraySegment<byte>(new byte[count], 0, count);
236+
await _socket.ReceiveAsync(awaitable);
237+
return awaitable.Transferred.Array;
238+
}
239+
}
240+
241+
/// <summary>
242+
/// Reads data from the server into the specified buffer.
243+
/// </summary>
244+
/// <param name="buffer">An array of <see cref="T:System.Byte"/> that is the storage location for the received data.</param>
245+
/// <param name="offset">The location in buffer to store the received data.</param>
246+
/// <param name="count">The number of bytes to read.</param>
247+
/// <remarks>This method blocks and will not return until the specified amount of bytes are read.</remarks>
248+
public void Read(byte[] buffer, int offset, int count)
249+
{
250+
CheckDisposed();
251+
252+
int read = 0;
253+
int shouldRead = count;
254+
255+
while (read < count)
256+
{
257+
try
258+
{
259+
int currentRead = _inputStream.Read(buffer, offset, shouldRead);
260+
if (currentRead < 1)
261+
continue;
262+
263+
read += currentRead;
264+
offset += currentRead;
265+
shouldRead -= currentRead;
266+
}
267+
catch (IOException)
268+
{
269+
_isAlive = false;
270+
throw;
271+
}
272+
}
273+
}
274+
275+
public void Write(byte[] data, int offset, int length)
276+
{
277+
CheckDisposed();
278+
279+
SocketError status;
280+
281+
_socket.Send(data, offset, length, SocketFlags.None, out status);
282+
283+
if (status != SocketError.Success)
284+
{
285+
_isAlive = false;
286+
287+
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, status);
288+
}
289+
}
290+
291+
public void Write(IList<ArraySegment<byte>> buffers)
292+
{
293+
CheckDisposed();
294+
295+
SocketError status;
296+
297+
#if DEBUG
298+
int total = 0;
299+
for (int i = 0, C = buffers.Count; i < C; i++)
300+
total += buffers[i].Count;
301+
302+
if (_socket.Send(buffers, SocketFlags.None, out status) != total)
303+
System.Diagnostics.Debugger.Break();
304+
#else
305+
_socket.Send(buffers, SocketFlags.None, out status);
306+
#endif
307+
308+
if (status != SocketError.Success)
309+
{
310+
_isAlive = false;
311+
312+
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, status);
313+
}
314+
}
315+
316+
public async Task WriteSync(IList<ArraySegment<byte>> buffers)
317+
{
318+
using (var awaitable = new SocketAwaitable())
319+
{
320+
awaitable.Arguments.BufferList = buffers;
321+
try
322+
{
323+
await _socket.SendAsync(awaitable);
324+
}
325+
catch
326+
{
327+
_isAlive = false;
328+
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError);
329+
}
330+
331+
if (awaitable.Arguments.SocketError != SocketError.Success)
332+
{
333+
_isAlive = false;
334+
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError);
335+
}
336+
}
337+
}
338+
339+
/// <summary>
340+
/// Receives data asynchronously. Returns true if the IO is pending. Returns false if the socket already failed or the data was available in the buffer.
341+
/// p.Next will only be called if the call completes asynchronously.
342+
/// </summary>
343+
public bool ReceiveAsync(AsyncIOArgs p)
344+
{
345+
CheckDisposed();
346+
347+
if (!_isAlive)
348+
{
349+
p.Fail = true;
350+
p.Result = null;
351+
352+
return false;
353+
}
354+
355+
if (_helper == null)
356+
_helper = new AsyncSocketHelper(this);
357+
358+
return _helper.Read(p);
359+
}
360+
}
361+
}
362+
363+
#region [ License information ]
364+
/* ************************************************************
365+
*
366+
* Copyright (c) 2010 Attila Kisk? enyim.com
367+
*
368+
* Licensed under the Apache License, Version 2.0 (the "License");
369+
* you may not use this file except in compliance with the License.
370+
* You may obtain a copy of the License at
371+
*
372+
* http://www.apache.org/licenses/LICENSE-2.0
373+
*
374+
* Unless required by applicable law or agreed to in writing, software
375+
* distributed under the License is distributed on an "AS IS" BASIS,
376+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
377+
* See the License for the specific language governing permissions and
378+
* limitations under the License.
379+
*
380+
* ************************************************************/
381+
#endregion

0 commit comments

Comments
 (0)