1111
1212namespace Enyim . Caching . Memcached
1313{
14- public partial class AsyncPooledSocket
15- {
16- /// <summary>
17- /// Supports exactly one reader and writer, but they can do IO concurrently
18- /// </summary>
19- private class AsyncSocketHelper
20- {
21- private const int ChunkSize = 65536 ;
22-
23- private AsyncPooledSocket socket ;
24- private SlidingBuffer asyncBuffer ;
25-
26- private SocketAsyncEventArgs readEvent ;
14+ public partial class AsyncPooledSocket
15+ {
16+ /// <summary>
17+ /// Supports exactly one reader and writer, but they can do IO concurrently
18+ /// </summary>
19+ private class AsyncSocketHelper
20+ {
21+ private const int ChunkSize = 65536 ;
22+
23+ private readonly PooledSocket socket ;
24+ private readonly SlidingBuffer asyncBuffer ;
25+
26+ private readonly SocketAsyncEventArgs readEvent ;
2727#if DEBUG_IO
2828 private int doingIO ;
2929#endif
30- private int remainingRead ;
31- private int expectedToRead ;
32- private AsyncIOArgs pendingArgs ;
33-
34- private int isAborted ;
35- private ManualResetEvent readInProgressEvent ;
36-
37- public AsyncSocketHelper ( AsyncPooledSocket socket )
38- {
39- this . socket = socket ;
40- this . asyncBuffer = new SlidingBuffer ( ChunkSize ) ;
41-
42- this . readEvent = new SocketAsyncEventArgs ( ) ;
43- this . readEvent . Completed += new EventHandler < SocketAsyncEventArgs > ( AsyncReadCompleted ) ;
44- this . readEvent . SetBuffer ( new byte [ ChunkSize ] , 0 , ChunkSize ) ;
45-
46- this . readInProgressEvent = new ManualResetEvent ( false ) ;
47- }
48-
49- /// <summary>
50- /// returns true if io is pending
51- /// </summary>
52- /// <param name="p"></param>
53- /// <returns></returns>
54- public bool Read ( AsyncIOArgs p )
55- {
56- var count = p . Count ;
57- if ( count < 1 ) throw new ArgumentOutOfRangeException ( "count" , "count must be > 0" ) ;
30+ private int remainingRead ;
31+ private int expectedToRead ;
32+ private AsyncIOArgs pendingArgs ;
33+
34+ private int isAborted ;
35+ private readonly ManualResetEvent readInProgressEvent ;
36+
37+ public AsyncSocketHelper ( PooledSocket socket )
38+ {
39+ this . socket = socket ;
40+ this . asyncBuffer = new SlidingBuffer ( ChunkSize ) ;
41+
42+ this . readEvent = new SocketAsyncEventArgs ( ) ;
43+ this . readEvent . Completed += new EventHandler < SocketAsyncEventArgs > ( AsyncReadCompleted ) ;
44+ this . readEvent . SetBuffer ( new byte [ ChunkSize ] , 0 , ChunkSize ) ;
45+
46+ this . readInProgressEvent = new ManualResetEvent ( false ) ;
47+ }
48+
49+ /// <summary>
50+ /// returns true if io is pending
51+ /// </summary>
52+ /// <param name="p"></param>
53+ /// <returns></returns>
54+ public bool Read ( AsyncIOArgs p )
55+ {
56+ var count = p . Count ;
57+ if ( count < 1 ) throw new ArgumentOutOfRangeException ( "count" , "count must be > 0" ) ;
5858#if DEBUG_IO
5959 if ( Interlocked . CompareExchange ( ref this . doingIO , 1 , 0 ) != 0 )
6060 throw new InvalidOperationException ( "Receive is already in progress" ) ;
6161#endif
62- this . expectedToRead = p . Count ;
63- this . pendingArgs = p ;
64-
65- p . Fail = false ;
66- p . Result = null ;
67-
68- if ( this . asyncBuffer . Available >= count )
69- {
70- PublishResult ( false ) ;
71-
72- return false ;
73- }
74- else
75- {
76- this . remainingRead = count - this . asyncBuffer . Available ;
77- this . isAborted = 0 ;
78-
79- this . BeginReceive ( ) ;
80-
81- return true ;
82- }
83- }
84-
85- public void DiscardBuffer ( )
86- {
87- this . asyncBuffer . UnsafeClear ( ) ;
88- }
89-
90- private void BeginReceive ( )
91- {
92- while ( this . remainingRead > 0 )
93- {
94- this . readInProgressEvent . Reset ( ) ;
95-
96- if ( this . socket . _socket . ReceiveAsync ( this . readEvent ) )
97- {
98- // wait until the timeout elapses, then abort this reading process
99- // EndREceive will be triggered sooner or later but its timeout
100- // may be higher than our read timeout, so it's not reliable
101- if ( ! readInProgressEvent . WaitOne ( this . socket . _socket . ReceiveTimeout ) )
102- this . AbortReadAndTryPublishError ( false ) ;
103-
104- return ;
105- }
106-
107- this . EndReceive ( ) ;
108- }
109- }
110-
111- void AsyncReadCompleted ( object sender , SocketAsyncEventArgs e )
112- {
113- if ( this . EndReceive ( ) )
114- this . BeginReceive ( ) ;
115- }
116-
117- private void AbortReadAndTryPublishError ( bool markAsDead )
118- {
119- if ( markAsDead )
120- this . socket . _isAlive = false ;
121-
122- // we've been already aborted, so quit
123- // both the EndReceive and the wait on the event can abort the read
124- // but only one should of them should continue the async call chain
125- if ( Interlocked . CompareExchange ( ref this . isAborted , 1 , 0 ) != 0 )
126- return ;
127-
128- this . remainingRead = 0 ;
129- var p = this . pendingArgs ;
62+ this . expectedToRead = p . Count ;
63+ this . pendingArgs = p ;
64+
65+ p . Fail = false ;
66+ p . Result = null ;
67+
68+ if ( this . asyncBuffer . Available >= count )
69+ {
70+ PublishResult ( false ) ;
71+
72+ return false ;
73+ }
74+ else
75+ {
76+ this . remainingRead = count - this . asyncBuffer . Available ;
77+ this . isAborted = 0 ;
78+
79+ this . BeginReceive ( ) ;
80+
81+ return true ;
82+ }
83+ }
84+
85+ public void DiscardBuffer ( )
86+ {
87+ this . asyncBuffer . UnsafeClear ( ) ;
88+ }
89+
90+ private void BeginReceive ( )
91+ {
92+ throw new NotImplementedException ( ) ;
93+ //while (this.remainingRead > 0)
94+ //{
95+ // this.readInProgressEvent.Reset();
96+
97+ // if (this.socket.ReceiveAsync(this.readEvent))
98+ // {
99+ // // wait until the timeout elapses, then abort this reading process
100+ // // EndREceive will be triggered sooner or later but its timeout
101+ // // may be higher than our read timeout, so it's not reliable
102+ // if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout))
103+ // this.AbortReadAndTryPublishError(false);
104+
105+ // return;
106+ // }
107+
108+ // this.EndReceive();
109+ //}
110+ }
111+
112+ void AsyncReadCompleted ( object sender , SocketAsyncEventArgs e )
113+ {
114+ if ( this . EndReceive ( ) )
115+ this . BeginReceive ( ) ;
116+ }
117+
118+ private void AbortReadAndTryPublishError ( bool markAsDead )
119+ {
120+ if ( markAsDead )
121+ this . socket . IsAlive = false ;
122+
123+ // we've been already aborted, so quit
124+ // both the EndReceive and the wait on the event can abort the read
125+ // but only one should of them should continue the async call chain
126+ if ( Interlocked . CompareExchange ( ref this . isAborted , 1 , 0 ) != 0 )
127+ return ;
128+
129+ this . remainingRead = 0 ;
130+ var p = this . pendingArgs ;
130131#if DEBUG_IO
131132 Thread . MemoryBarrier ( ) ;
132133
133134 this . doingIO = 0 ;
134135#endif
135136
136- p . Fail = true ;
137- p . Result = null ;
137+ p . Fail = true ;
138+ p . Result = null ;
138139
139- this . pendingArgs . Next ( p ) ;
140- }
140+ this . pendingArgs . Next ( p ) ;
141+ }
141142
142- /// <summary>
143- /// returns true when io is pending
144- /// </summary>
145- /// <returns></returns>
146- private bool EndReceive ( )
147- {
148- this . readInProgressEvent . Set ( ) ;
143+ /// <summary>
144+ /// returns true when io is pending
145+ /// </summary>
146+ /// <returns></returns>
147+ private bool EndReceive ( )
148+ {
149+ this . readInProgressEvent . Set ( ) ;
149150
150- var read = this . readEvent . BytesTransferred ;
151- if ( this . readEvent . SocketError != SocketError . Success
152- || read == 0 )
153- {
154- this . AbortReadAndTryPublishError ( true ) ; //new IOException("Remote end has been closed"));
151+ var read = this . readEvent . BytesTransferred ;
152+ if ( this . readEvent . SocketError != SocketError . Success
153+ || read == 0 )
154+ {
155+ this . AbortReadAndTryPublishError ( true ) ; //new IOException("Remote end has been closed"));
155156
156- return false ;
157- }
157+ return false ;
158+ }
158159
159- this . remainingRead -= read ;
160- this . asyncBuffer . Append ( this . readEvent . Buffer , 0 , read ) ;
160+ this . remainingRead -= read ;
161+ this . asyncBuffer . Append ( this . readEvent . Buffer , 0 , read ) ;
161162
162- if ( this . remainingRead <= 0 )
163- {
164- this . PublishResult ( true ) ;
163+ if ( this . remainingRead <= 0 )
164+ {
165+ this . PublishResult ( true ) ;
165166
166- return false ;
167- }
167+ return false ;
168+ }
168169
169- return true ;
170- }
170+ return true ;
171+ }
171172
172- private void PublishResult ( bool isAsync )
173- {
174- var retval = this . pendingArgs ;
173+ private void PublishResult ( bool isAsync )
174+ {
175+ var retval = this . pendingArgs ;
175176
176- var data = new byte [ this . expectedToRead ] ;
177- this . asyncBuffer . Read ( data , 0 , retval . Count ) ;
178- pendingArgs . Result = data ;
177+ var data = new byte [ this . expectedToRead ] ;
178+ this . asyncBuffer . Read ( data , 0 , retval . Count ) ;
179+ pendingArgs . Result = data ;
179180#if DEBUG_IO
180181 Thread . MemoryBarrier ( ) ;
181182 this . doingIO = 0 ;
182183#endif
183184
184- if ( isAsync )
185- pendingArgs . Next ( pendingArgs ) ;
186- }
187- }
188- }
185+ if ( isAsync )
186+ pendingArgs . Next ( pendingArgs ) ;
187+ }
188+ }
189+ }
189190
190191 public partial class PooledSocket
191192 {
@@ -196,10 +197,10 @@ private class AsyncSocketHelper
196197 {
197198 private const int ChunkSize = 65536 ;
198199
199- private PooledSocket socket ;
200- private SlidingBuffer asyncBuffer ;
200+ private readonly PooledSocket socket ;
201+ private readonly SlidingBuffer asyncBuffer ;
201202
202- private SocketAsyncEventArgs readEvent ;
203+ private readonly SocketAsyncEventArgs readEvent ;
203204#if DEBUG_IO
204205 private int doingIO ;
205206#endif
0 commit comments