99 */
1010namespace Adoy \FastCGI ;
1111
12+ class TimedOutException extends \Exception {}
13+
1214/**
1315 * Handles communication with a FastCGI application
1416 *
@@ -47,6 +49,11 @@ class Client
4749
4850 const HEADER_LEN = 8 ;
4951
52+ const REQ_STATE_WRITTEN = 1 ;
53+ const REQ_STATE_OK = 2 ;
54+ const REQ_STATE_ERR = 3 ;
55+ const REQ_STATE_TIMED_OUT = 4 ;
56+
5057 /**
5158 * Socket
5259 * @var Resource
@@ -71,6 +78,38 @@ class Client
7178 */
7279 private $ _keepAlive = false ;
7380
81+ /**
82+ * Outstanding request statuses keyed by request id
83+ *
84+ * Each request is an array with following form:
85+ *
86+ * array(
87+ * 'state' => REQ_STATE_*
88+ * 'response' => null | string
89+ * )
90+ *
91+ * @var array
92+ */
93+ private $ _requests = array ();
94+
95+ /**
96+ * Use persistent sockets to connect to backend
97+ * @var Boolean
98+ */
99+ private $ _persistentSocket = false ;
100+
101+ /**
102+ * Connect timeout in milliseconds
103+ * @var Integer
104+ */
105+ private $ _connectTimeout = 5000 ;
106+
107+ /**
108+ * Read/Write timeout in milliseconds
109+ * @var Integer
110+ */
111+ private $ _readWriteTimeout = 5000 ;
112+
74113 /**
75114 * Constructor
76115 *
@@ -107,15 +146,105 @@ public function getKeepAlive()
107146 return $ this ->_keepAlive ;
108147 }
109148
149+ /**
150+ * Define whether or not PHP should attempt to re-use sockets opened by previous
151+ * request for efficiency
152+ *
153+ * @param Boolean $b true if persistent socket should be used, false otherwise
154+ */
155+ public function setPersistentSocket ($ b )
156+ {
157+ $ was_persistent = ($ this ->_sock && $ this ->_persistentSocket );
158+ $ this ->_persistentSocket = (boolean )$ b ;
159+ if (!$ this ->_persistentSocket && $ was_persistent ) {
160+ fclose ($ this ->_sock );
161+ }
162+ }
163+
164+ /**
165+ * Get the pesistent socket status
166+ *
167+ * @return Boolean true if the socket should be persistent, false otherwise
168+ */
169+ public function getPersistentSocket ()
170+ {
171+ return $ this ->_persistentSocket ;
172+ }
173+
174+
175+ /**
176+ * Set the connect timeout
177+ *
178+ * @param Integer number of milliseconds before connect will timeout
179+ */
180+ public function setConnectTimeout ($ timeoutMs )
181+ {
182+ $ this ->_connectTimeout = $ timeoutMs ;
183+ }
184+
185+ /**
186+ * Get the connect timeout
187+ *
188+ * @return Integer number of milliseconds before connect will timeout
189+ */
190+ public function getConnectTimeout ()
191+ {
192+ return $ this ->_connectTimeout ;
193+ }
194+
195+ /**
196+ * Set the read/write timeout
197+ *
198+ * @param Integer number of milliseconds before read or write call will timeout
199+ */
200+ public function setReadWriteTimeout ($ timeoutMs )
201+ {
202+ $ this ->_readWriteTimeout = $ timeoutMs ;
203+ $ this ->set_ms_timeout ($ this ->_readWriteTimeout );
204+ }
205+
206+ /**
207+ * Get the read timeout
208+ *
209+ * @return Integer number of milliseconds before read will timeout
210+ */
211+ public function getReadWriteTimeout ()
212+ {
213+ return $ this ->_readWriteTimeout ;
214+ }
215+
216+ /**
217+ * Helper to avoid duplicating milliseconds to secs/usecs in a few places
218+ *
219+ * @param Integer millisecond timeout
220+ * @return Boolean
221+ */
222+ private function set_ms_timeout ($ timeoutMs ) {
223+ if (!$ this ->_sock ) {
224+ return false ;
225+ }
226+ return stream_set_timeout ($ this ->_sock , floor ($ timeoutMs / 1000 ), ($ timeoutMs % 1000 ) * 1000 );
227+ }
228+
229+
110230 /**
111231 * Create a connection to the FastCGI application
112232 */
113233 private function connect ()
114234 {
115235 if (!$ this ->_sock ) {
116- $ this ->_sock = fsockopen ($ this ->_host , $ this ->_port , $ errno , $ errstr , 5 );
236+ if ($ this ->_persistentSocket ) {
237+ $ this ->_sock = pfsockopen ($ this ->_host , $ this ->_port , $ errno , $ errstr , $ this ->_connectTimeout /1000 );
238+ } else {
239+ $ this ->_sock = fsockopen ($ this ->_host , $ this ->_port , $ errno , $ errstr , $ this ->_connectTimeout /1000 );
240+ }
241+
117242 if (!$ this ->_sock ) {
118- throw new \Exception ('Unable to connect to FastCGI application ' );
243+ throw new \Exception ('Unable to connect to FastCGI application: ' . $ errstr );
244+ }
245+
246+ if (!$ this ->set_ms_timeout ($ this ->_readWriteTimeout )) {
247+ throw new \Exception ('Unable to set timeout on socket ' );
119248 }
120249 }
121250 }
@@ -245,7 +374,7 @@ private function readPacket()
245374 }
246375 }
247376 if ($ resp ['paddingLength ' ]) {
248- $ buf= fread ($ this ->_sock , $ resp ['paddingLength ' ]);
377+ $ buf = fread ($ this ->_sock , $ resp ['paddingLength ' ]);
249378 }
250379 return $ resp ;
251380 } else {
@@ -286,38 +415,144 @@ public function getValues(array $requestedInfo)
286415 */
287416 public function request (array $ params , $ stdin )
288417 {
289- $ response = '' ;
418+ $ id = $ this ->async_request ($ params , $ stdin );
419+ return $ this ->wait_for_response ($ id );
420+ }
421+
422+ /**
423+ * Execute a request to the FastCGI application asyncronously
424+ *
425+ * This sends request to application and returns the assigned ID for that request.
426+ *
427+ * You should keep this id for later use with wait_for_response(). Ids are chosen randomly
428+ * rather than seqentially to guard against false-positives when using persistent sockets.
429+ * In that case it is possible that a delayed response to a request made by a previous script
430+ * invocation comes back on this socket and is mistaken for response to request made with same ID
431+ * during this request.
432+ *
433+ * @param array $params Array of parameters
434+ * @param String $stdin Content
435+ * @return Integer
436+ */
437+ public function async_request (array $ params , $ stdin )
438+ {
290439 $ this ->connect();
291440
292- $ request = $ this ->buildPacket (self ::BEGIN_REQUEST , chr (0 ) . chr (self ::RESPONDER ) . chr ((int ) $ this ->_keepAlive ) . str_repeat (chr (0 ), 5 ));
441+ // Pick random number between 1 and max 16 bit unsigned int 65535
442+ $ id = mt_rand (1 , (1 << 16 ) - 1 );
443+
444+ // Using persistent sockets implies you want them keept alive by server!
445+ $ keepAlive = intval ($ this ->_keepAlive || $ this ->_persistentSocket );
446+
447+ $ request = $ this ->buildPacket (self ::BEGIN_REQUEST
448+ ,chr (0 ) . chr (self ::RESPONDER ) . chr ($ keepAlive ) . str_repeat (chr (0 ), 5 )
449+ ,$ id
450+ );
293451
294452 $ paramsRequest = '' ;
295453 foreach ($ params as $ key => $ value ) {
296- $ paramsRequest .= $ this ->buildNvpair ($ key , $ value );
454+ $ paramsRequest .= $ this ->buildNvpair ($ key , $ value, $ id );
297455 }
298456 if ($ paramsRequest ) {
299- $ request .= $ this ->buildPacket (self ::PARAMS , $ paramsRequest );
457+ $ request .= $ this ->buildPacket (self ::PARAMS , $ paramsRequest, $ id );
300458 }
301- $ request .= $ this ->buildPacket (self ::PARAMS , '' );
459+ $ request .= $ this ->buildPacket (self ::PARAMS , '' , $ id );
302460
303461 if ($ stdin ) {
304- $ request .= $ this ->buildPacket (self ::STDIN , $ stdin );
462+ $ request .= $ this ->buildPacket (self ::STDIN , $ stdin , $ id );
463+ }
464+ $ request .= $ this ->buildPacket (self ::STDIN , '' , $ id );
465+
466+ if (fwrite ($ this ->_sock , $ request ) === false || fflush ($ this ->_sock ) === false ) {
467+
468+ $ info = stream_get_meta_data ($ this ->_sock );
469+
470+ if ($ info ['timed_out ' ]) {
471+ throw new TimedOutException ('Write timed out ' );
472+ }
473+
474+ // Broken pipe, tear down so future requests might succeed
475+ fclose ($ this ->_sock );
476+ throw new \Exception ('Failed to write request to socket ' );
477+ }
478+
479+ $ this ->_requests [$ id ] = array (
480+ 'state ' => self ::REQ_STATE_WRITTEN ,
481+ 'response ' => null
482+ );
483+
484+ return $ id;
485+ }
486+
487+ /**
488+ * Blocking call that waits for response to specific request
489+ *
490+ * @param Integer $requestId
491+ * @param Integer $timeoutMs [optional] the number of milliseconds to wait. Defaults to the ReadWriteTimeout value set.
492+ * @return string response body
493+ */
494+ public function wait_for_response ($ requestId , $ timeoutMs = 0 ) {
495+
496+ if (!isset ($ this ->_requests [$ requestId ])) {
497+ throw new \Exception ('Invalid request id given ' );
498+ }
499+
500+ // If we already read the response during an earlier call for different id, just return it
501+ if ($ this ->_requests [$ requestId ]['state ' ] == self ::REQ_STATE_OK
502+ || $ this ->_requests [$ requestId ]['state ' ] == self ::REQ_STATE_ERR
503+ ) {
504+ return $ this ->_requests [$ requestId ]['response ' ];
505+ }
506+
507+ if ($ timeoutMs > 0 ) {
508+ // Reset timeout on socket for now
509+ $ this ->set_ms_timeout ($ timeoutMs );
510+ } else {
511+ $ timeoutMs = $ this ->_readWriteTimeout ;
305512 }
306- $ request .= $ this ->buildPacket (self ::STDIN , '' );
307513
308- fwrite ($ this ->_sock , $ request );
514+ // Need to manually check since we might do several reads none of which timeout themselves
515+ // but still not get the response requested
516+ $ startTime = microtime (true );
309517
310518 do {
311519 $ resp = $ this ->readPacket ();
520+
312521 if ($ resp ['type ' ] == self ::STDOUT || $ resp ['type ' ] == self ::STDERR ) {
313- $ response .= $ resp ['content ' ];
522+ if ($ resp ['type ' ] == self ::STDERR ) {
523+ $ this ->_requests [$ resp ['requestId ' ]]['state ' ] = self ::REQ_STATE_ERR ;
524+ }
525+ $ this ->_requests [$ resp ['requestId ' ]]['response ' ] .= $ resp ['content ' ];
526+ }
527+ if ($ resp ['type ' ] == self ::END_REQUEST ) {
528+ $ this ->_requests [$ resp ['requestId ' ]]['state ' ] = self ::REQ_STATE_OK ;
529+ if ($ resp ['requestId ' ] == $ requestId ) {
530+ break ;
531+ }
532+ }
533+ if (microtime (true ) - $ startTime >= ($ timeoutMs * 1000 )) {
534+ // Reset
535+ $ this ->set_ms_timeout ($ this ->_readWriteTimeout );
536+ throw new \Exception ('Timed out ' );
314537 }
315- } while ($ resp && $ resp [ ' type ' ] != self :: END_REQUEST );
538+ } while ($ resp );
316539
317540 if (!is_array ($ resp )) {
318- throw new \Exception ('Bad request ' );
541+ $ info = stream_get_meta_data ($ this ->_sock );
542+
543+ // We must reset timeout but it must be AFTER we get info
544+ $ this ->set_ms_timeout ($ this ->_readWriteTimeout );
545+
546+ if ($ info ['timed_out ' ]) {
547+ throw new TimedOutException ('Read timed out ' );
548+ }
549+
550+ throw new \Exception ('Read failed ' );
319551 }
320552
553+ // Reset timeout
554+ $ this ->set_ms_timeout ($ this ->_readWriteTimeout );
555+
321556 switch (ord($ resp ['content ' ]{4 })) {
322557 case self ::CANT_MPX_CONN :
323558 throw new \Exception ('This app can \'t multiplex [CANT_MPX_CONN] ' );
@@ -329,7 +564,7 @@ public function request(array $params, $stdin)
329564 throw new \Exception ('Role value not known [UNKNOWN_ROLE] ' );
330565 break ;
331566 case self ::REQUEST_COMPLETE :
332- return $ response ;
567+ return $ this -> _requests [ $ requestId ][ ' response ' ] ;
333568 }
334569 }
335570}
0 commit comments