1+ using System ;
2+ using System . IO ;
3+ using System . Net ;
4+ using System . Text ;
5+ using System . Threading ;
6+ using System . Threading . Tasks ;
7+ using Nest ;
8+ using Nest . Domain . Connection ;
9+
10+ namespace ProtocolLoadTest
11+ {
12+ public class AsyncRequestOperation : TaskCompletionSource < ConnectionStatus > , IDisposable
13+ {
14+ private readonly HttpWebRequest m_request ;
15+ private readonly string m_requestData ;
16+ private readonly IConnectionSettings m_connectionSettings ;
17+ private ConnectionStatusTracer m_tracer ;
18+ private WebResponse m_response ;
19+ private Stream m_responseStream ;
20+
21+ public AsyncRequestOperation ( HttpWebRequest request , string requestData , IConnectionSettings connectionSettings , ConnectionStatusTracer tracer )
22+ {
23+ m_request = request ;
24+ m_requestData = requestData ;
25+ m_connectionSettings = connectionSettings ;
26+ m_tracer = tracer ;
27+ Start ( ) ;
28+ }
29+
30+ private void Start ( )
31+ {
32+ if ( this . m_requestData != null )
33+ WriteRequestDataAsync ( ) ;
34+ else
35+ GetResponseAsync ( ) ;
36+ }
37+
38+ private void WriteRequestDataAsync ( )
39+ {
40+ this . m_request . BeginGetRequestStream ( this . Monitor ( ar =>
41+ {
42+ var r = this . m_request . EndGetRequestStream ( ar ) ;
43+ var buffer = Encoding . UTF8 . GetBytes ( this . m_requestData ) ;
44+ r . BeginWrite ( buffer , 0 , buffer . Length , this . Monitor ( writeIar =>
45+ {
46+ r . EndWrite ( writeIar ) ;
47+ GetResponseAsync ( ) ;
48+ } ) , null ) ;
49+ } ) , null ) ;
50+ }
51+
52+ private void GetResponseAsync ( )
53+ {
54+ this . m_request . BeginGetResponse ( this . Monitor ( iarResponse =>
55+ {
56+ m_response = m_request . EndGetResponse ( iarResponse ) ;
57+ m_responseStream = m_response . GetResponseStream ( ) ;
58+
59+ var buffer = new byte [ 8192 ] ;
60+ var result = new MemoryStream ( buffer . Length ) ;
61+ ReadResponseStreamAsync ( this . m_responseStream , buffer , result ) ;
62+
63+ } ) , null ) ;
64+ }
65+
66+ private void ReadResponseStreamAsync ( Stream stream , byte [ ] buffer , MemoryStream result )
67+ {
68+ stream . BeginRead ( buffer , 0 , buffer . Length , this . Monitor ( iar =>
69+ {
70+ var bytes = stream . EndRead ( iar ) ;
71+ if ( bytes == 0 )
72+ {
73+ Done ( result ) ;
74+ return ;
75+ }
76+
77+ result . Write ( buffer , 0 , bytes ) ;
78+ ReadResponseStreamAsync ( stream , buffer , result ) ;
79+
80+ } ) , null ) ;
81+ }
82+
83+ private void Done ( ConnectionStatus connectionStatus )
84+ {
85+ m_tracer . SetResult ( connectionStatus ) ;
86+ TrySetResult ( connectionStatus ) ;
87+ Dispose ( ) ;
88+ }
89+
90+ private void Done ( Stream result )
91+ {
92+ result . Position = 0 ;
93+ var reader = new StreamReader ( result ) ;
94+ Done ( new ConnectionStatus ( reader . ReadToEnd ( ) )
95+ {
96+ Request = this . m_requestData ,
97+ RequestUrl = this . m_request . RequestUri . ToString ( ) ,
98+ RequestMethod = this . m_request . Method
99+ } ) ;
100+
101+ }
102+
103+ private AsyncCallback Monitor ( AsyncCallback callback )
104+ {
105+ return ar =>
106+ {
107+ try
108+ {
109+ callback ( ar ) ;
110+ }
111+ catch ( WebException webException )
112+ {
113+ var connectionStatus = new ConnectionStatus ( webException )
114+ {
115+ Request = this . m_requestData ,
116+ RequestUrl = this . m_request . RequestUri . ToString ( ) ,
117+ RequestMethod = this . m_request . Method
118+ } ;
119+ m_connectionSettings . ConnectionStatusHandler ( connectionStatus ) ;
120+ Done ( connectionStatus ) ;
121+ }
122+ catch ( Exception e )
123+ {
124+ TrySetException ( e ) ;
125+ Dispose ( ) ;
126+ }
127+ } ;
128+ }
129+
130+ public void Dispose ( )
131+ {
132+ Dispose ( ref m_response ) ;
133+ Dispose ( ref m_responseStream ) ;
134+ Dispose ( ref m_tracer ) ;
135+ }
136+
137+ private static void Dispose < T > ( ref T disposable ) where T : class , IDisposable
138+ {
139+ var d = Interlocked . Exchange ( ref disposable , null ) ;
140+ if ( d != null )
141+ d . Dispose ( ) ;
142+ }
143+ }
144+ }
0 commit comments