Skip to content

Commit 0363cf2

Browse files
committed
Update to react/http v1
1 parent 920a8a0 commit 0363cf2

File tree

7 files changed

+107
-108
lines changed

7 files changed

+107
-108
lines changed

composer.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@
3737
]
3838
},
3939
"require": {
40-
"react/http": "^0.7.3 | ^0.8",
41-
"react/http-client": "^0.5.3",
40+
"react/http": "^1",
4241
"voryx/event-loop": "^3.0 || ^2.0.2",
43-
"ratchet/rfc6455": "^0.2.2",
44-
"reactivex/rxphp": "^2.0.1"
42+
"ratchet/rfc6455": "^0.3.0",
43+
"reactivex/rxphp": "^2.0.1",
44+
"ratchet/pawl": "^0.3.5",
45+
"react/http-client": "^0.5.10"
4546
},
4647
"require-dev": {
47-
"phpunit/phpunit": "~5.7.0"
48+
"phpunit/phpunit": "^9"
4849
}
4950
}

phpunit.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
convertWarningsToExceptions="true"
99
processIsolation="false"
1010
stopOnFailure="false"
11-
syntaxCheck="false"
1211
bootstrap="test/bootstrap.php">
1312
<testsuites>
1413
<testsuite name="RxWebsocket Test">

src/Client.php

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
use GuzzleHttp\Psr7\Request as Psr7Request;
66
use GuzzleHttp\Psr7\Response as Psr7Response;
77
use GuzzleHttp\Psr7\Uri;
8+
use Psr\Http\Message\ResponseInterface;
89
use Ratchet\RFC6455\Handshake\ClientNegotiator;
910
use React\EventLoop\LoopInterface;
10-
use React\HttpClient\Client as HttpClient;
11-
use React\HttpClient\Request;
12-
use React\HttpClient\Response;
11+
use React\Http\Client\Client as HttpClient;
12+
use React\Http\Client\Request;
13+
use React\Http\Message\Response;
14+
use React\Socket\ConnectionInterface;
1315
use React\Socket\ConnectorInterface;
1416
use Rx\Disposable\CallbackDisposable;
1517
use Rx\DisposableInterface;
@@ -78,17 +80,17 @@ public function _subscribe(ObserverInterface $clientObserver): DisposableInterfa
7880
$clientObserver->onError($error);
7981
});
8082

81-
$request->on('response', function (Response $response, Request $request) use ($flatHeaders, $cNegotiator, $nRequest, $clientObserver) {
82-
if ($response->getCode() !== 101) {
83-
$clientObserver->onError(new \Exception('Unexpected response code ' . $response->getCode()));
83+
$request->on('response', function (ResponseInterface $response, ConnectionInterface $request) use ($flatHeaders, $cNegotiator, $nRequest, $clientObserver) {
84+
if ($response->getStatusCode() !== 101) {
85+
$clientObserver->onError(new \Exception('Unexpected response code ' . $response->getStatusCode()));
8486
return;
8587
}
8688

8789
$psr7Response = new Psr7Response(
88-
$response->getCode(),
90+
$response->getStatusCode(),
8991
$response->getHeaders(),
9092
null,
91-
$response->getVersion()
93+
$response->getProtocolVersion()
9294
);
9395

9496
$psr7Request = new Psr7Request('GET', $this->url, $flatHeaders);
@@ -103,19 +105,19 @@ public function _subscribe(ObserverInterface $clientObserver): DisposableInterfa
103105
$clientObserver->onNext(new MessageSubject(
104106
new AnonymousObservable(function (ObserverInterface $observer) use ($response, $request, $clientObserver) {
105107

106-
$response->on('data', function ($data) use ($observer) {
108+
$request->on('data', function ($data) use ($observer) {
107109
$observer->onNext($data);
108110
});
109111

110-
$response->on('error', function ($e) use ($observer) {
112+
$request->on('error', function ($e) use ($observer) {
111113
$observer->onError($e);
112114
});
113115

114-
$response->on('close', function () use ($observer) {
116+
$request->on('close', function () use ($observer) {
115117
$observer->onCompleted();
116118
});
117119

118-
$response->on('end', function () use ($observer, $clientObserver) {
120+
$request->on('end', function () use ($observer, $clientObserver) {
119121
$observer->onCompleted();
120122

121123
// complete the parent observer - we only do 1 connection

src/Server.php

Lines changed: 84 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
use Ratchet\RFC6455\Handshake\RequestVerifier;
88
use Ratchet\RFC6455\Handshake\ServerNegotiator;
99
use React\EventLoop\LoopInterface;
10-
use React\Http\Response;
10+
use React\Http\Message\Response;
11+
use React\Http\Middleware\StreamingRequestMiddleware;
1112
use React\Http\Server as HttpServer;
1213
use React\Socket\Server as SocketServer;
1314
use React\Stream\CompositeStream;
@@ -47,93 +48,89 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
4748
$negotiator->setSupportedSubProtocols($this->subProtocols);
4849
}
4950

50-
$http = new HttpServer(function (ServerRequestInterface $request) use ($negotiator, $observer) {
51-
$uri = $request->getUri();
52-
53-
$psrRequest = new Request(
54-
$request->getMethod(),
55-
$uri,
56-
$request->getHeaders()
57-
);
58-
59-
// cram the remote address into the header in our own X- header so
60-
// the user will have access to it
61-
$psrRequest = $psrRequest->withAddedHeader('X-RxWebsocket-Remote-Address', $request->getServerParams()['REMOTE_ADDR'] ?? '');
62-
63-
$negotiatorResponse = $negotiator->handshake($psrRequest);
64-
65-
/** @var ReadableStreamInterface $requestStream */
66-
$requestStream = new ThroughStream();
67-
$responseStream = new ThroughStream();
68-
69-
$response = new Response(
70-
$negotiatorResponse->getStatusCode(),
71-
array_merge(
72-
$negotiatorResponse->getHeaders()
73-
),
74-
new CompositeStream(
75-
$responseStream,
76-
$requestStream
77-
)
78-
);
79-
80-
81-
if ($negotiatorResponse->getStatusCode() !== 101) {
82-
$responseStream->end();
83-
return;
84-
}
85-
86-
$subProtocol = "";
87-
if (count($negotiatorResponse->getHeader('Sec-WebSocket-Protocol')) > 0) {
88-
$subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0];
51+
$http = new HttpServer(
52+
$this->loop,
53+
new StreamingRequestMiddleware(),
54+
function (ServerRequestInterface $request) use ($negotiator, $observer) {
55+
// cram the remote address into the header in our own X- header so
56+
// the user will have access to it
57+
$request = $request->withAddedHeader('X-RxWebsocket-Remote-Address', $request->getServerParams()['REMOTE_ADDR'] ?? '');
58+
59+
$negotiatorResponse = $negotiator->handshake($request);
60+
61+
/** @var ReadableStreamInterface $requestStream */
62+
$requestStream = new ThroughStream();
63+
$responseStream = new ThroughStream();
64+
65+
$response = new Response(
66+
$negotiatorResponse->getStatusCode(),
67+
array_merge(
68+
$negotiatorResponse->getHeaders()
69+
),
70+
new CompositeStream(
71+
$responseStream,
72+
$requestStream
73+
)
74+
);
75+
76+
if ($negotiatorResponse->getStatusCode() !== 101) {
77+
$responseStream->close();
78+
return;
79+
}
80+
81+
$subProtocol = "";
82+
if (count($negotiatorResponse->getHeader('Sec-WebSocket-Protocol')) > 0) {
83+
$subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0];
84+
}
85+
86+
$messageSubject = new MessageSubject(
87+
new AnonymousObservable(
88+
function (ObserverInterface $observer) use ($requestStream) {
89+
$requestStream->on('data', function ($data) use ($observer) {
90+
var_export($data);
91+
$observer->onNext($data);
92+
});
93+
$requestStream->on('error', function ($error) use ($observer) {
94+
$observer->onError($error);
95+
});
96+
$requestStream->on('close', function () use ($observer) {
97+
$observer->onCompleted();
98+
});
99+
$requestStream->on('end', function () use ($observer) {
100+
$observer->onCompleted();
101+
});
102+
103+
return new CallbackDisposable(
104+
function () use ($requestStream) {
105+
$requestStream->close();
106+
}
107+
);
108+
}
109+
),
110+
new CallbackObserver(
111+
function ($x) use ($responseStream) {
112+
$responseStream->write($x);
113+
},
114+
function ($error) use ($responseStream) {
115+
$responseStream->close();
116+
},
117+
function () use ($responseStream) {
118+
$responseStream->close();
119+
}
120+
),
121+
false,
122+
$this->useMessageObject,
123+
$subProtocol,
124+
$request,
125+
$negotiatorResponse,
126+
$this->keepAlive
127+
);
128+
129+
$observer->onNext($messageSubject);
130+
131+
return $response;
89132
}
90-
91-
$messageSubject = new MessageSubject(
92-
new AnonymousObservable(
93-
function (ObserverInterface $observer) use ($requestStream) {
94-
$requestStream->on('data', function ($data) use ($observer) {
95-
$observer->onNext($data);
96-
});
97-
$requestStream->on('error', function ($error) use ($observer) {
98-
$observer->onError($error);
99-
});
100-
$requestStream->on('close', function () use ($observer) {
101-
$observer->onCompleted();
102-
});
103-
$requestStream->on('end', function () use ($observer) {
104-
$observer->onCompleted();
105-
});
106-
107-
return new CallbackDisposable(
108-
function () use ($requestStream) {
109-
$requestStream->close();
110-
}
111-
);
112-
}
113-
),
114-
new CallbackObserver(
115-
function ($x) use ($responseStream) {
116-
$responseStream->write($x);
117-
},
118-
function ($error) use ($responseStream) {
119-
$responseStream->close();
120-
},
121-
function () use ($responseStream) {
122-
$responseStream->end();
123-
}
124-
),
125-
false,
126-
$this->useMessageObject,
127-
$subProtocol,
128-
$psrRequest,
129-
$negotiatorResponse,
130-
$this->keepAlive
131-
);
132-
133-
$observer->onNext($messageSubject);
134-
135-
return $response;
136-
});
133+
);
137134

138135
$http->listen($socket);
139136

test/ABResultsTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Rx\Websocket\Test;
44

5-
class ABResultsTest extends \PHPUnit_Framework_TestCase
5+
class ABResultsTest extends \PHPUnit\Framework\TestCase
66
{
77
private function verifyAutobahnResults($fileName)
88
{

test/ClientTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use React\EventLoop\Factory;
66

7-
class ClientTest extends \PHPUnit_Framework_TestCase
7+
class ClientTest extends \PHPUnit\Framework\TestCase
88
{
99
public function testErrorBeforeRequest()
1010
{

test/TestCase.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class TestCase extends FunctionalTestCase
99
{
10-
public function setup()
10+
public function setup(): void
1111
{
1212
parent::setup();
1313

0 commit comments

Comments
 (0)