Skip to content

Commit 4754a51

Browse files
andigmarcj
authored andcommitted
Asynchronously handle streamed responses (#40)
* Asynchronously handle streamed responses * Cleanup and add Content-Length
1 parent 09fb0ae commit 4754a51

File tree

1 file changed

+37
-17
lines changed

1 file changed

+37
-17
lines changed

Bridges/HttpKernel.php

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ public function onRequest(ReactRequest $request, HttpResponse $response)
7878

7979
$syRequest = $this->mapRequest($request);
8080

81-
//start buffering the output, so cgi is not sending any http headers
82-
//this is necessary because it would break session handling since
83-
//headers_sent() returns true if any unbuffered output reaches cgi stdout.
81+
// start buffering the output, so cgi is not sending any http headers
82+
// this is necessary because it would break session handling since
83+
// headers_sent() returns true if any unbuffered output reaches cgi stdout.
8484
ob_start();
8585

8686
try {
@@ -92,9 +92,15 @@ public function onRequest(ReactRequest $request, HttpResponse $response)
9292
} catch (\Exception $exception) {
9393
$response->writeHead(500); // internal server error
9494
$response->end();
95+
96+
// end buffering if we need to throw
97+
@ob_end_clean();
9598
throw $exception;
9699
}
97100

101+
// should not receive output from application->handle()
102+
@ob_end_clean();
103+
98104
$this->mapResponse($response, $syResponse);
99105

100106
if ($this->application instanceof TerminableInterface) {
@@ -170,15 +176,10 @@ protected function mapRequest(ReactRequest $reactRequest)
170176
*/
171177
protected function mapResponse(HttpResponse $reactResponse, SymfonyResponse $syResponse)
172178
{
173-
//end active session
179+
// end active session
174180
if (PHP_SESSION_ACTIVE === session_status()) {
175181
session_write_close();
176-
session_unset(); //reset $_SESSION
177-
}
178-
179-
$content = $syResponse->getContent();
180-
if ($syResponse instanceof SymfonyStreamedResponse) {
181-
$syResponse->sendContent();
182+
session_unset(); // reset $_SESSION
182183
}
183184

184185
$nativeHeaders = [];
@@ -200,8 +201,8 @@ protected function mapResponse(HttpResponse $reactResponse, SymfonyResponse $syR
200201
}
201202
}
202203

203-
//after reading all headers we need to reset it, so next request
204-
//operates on a clean header.
204+
// after reading all headers we need to reset it, so next request
205+
// operates on a clean header.
205206
header_remove();
206207

207208
$headers = array_merge($nativeHeaders, $syResponse->headers->allPreserveCase());
@@ -238,14 +239,33 @@ protected function mapResponse(HttpResponse $reactResponse, SymfonyResponse $syR
238239
$headers['Set-Cookie'] = $cookies;
239240
}
240241

241-
$reactResponse->writeHead($syResponse->getStatusCode(), $headers);
242+
if ($syResponse instanceof SymfonyStreamedResponse) {
243+
$reactResponse->writeHead($syResponse->getStatusCode(), $headers);
244+
245+
// asynchronously get content
246+
ob_start(function($buffer) use ($reactResponse) {
247+
$reactResponse->write($buffer);
248+
return '';
249+
}, 4096);
250+
251+
$syResponse->sendContent();
242252

243-
$stdOut = '';
244-
while ($buffer = @ob_get_clean()) {
245-
$stdOut .= $buffer;
253+
// flush remaining content
254+
@ob_end_flush();
255+
$reactResponse->end();
246256
}
257+
else {
258+
ob_start();
259+
$content = $syResponse->getContent();
260+
@ob_end_flush();
247261

248-
$reactResponse->end($stdOut . $content);
262+
if (!isset($headers['Content-Length'])) {
263+
$headers['Content-Length'] = strlen($content);
264+
}
265+
266+
$reactResponse->writeHead($syResponse->getStatusCode(), $headers);
267+
$reactResponse->end($content);
268+
}
249269
}
250270

251271
/**

0 commit comments

Comments
 (0)