22
33namespace Rx \Websocket ;
44
5- use GuzzleHttp \Psr7 \Request ;
65use Psr \Http \Message \ServerRequestInterface ;
76use Ratchet \RFC6455 \Handshake \RequestVerifier ;
87use Ratchet \RFC6455 \Handshake \ServerNegotiator ;
98use React \EventLoop \LoopInterface ;
109use React \Http \Message \Response ;
11- use React \Http \Middleware \StreamingRequestMiddleware ;
12- use React \Http \Server as HttpServer ;
13- use React \Socket \Server as SocketServer ;
10+ use React \Http \Middleware \StreamingRequestMiddleware as StreamingRequestMiddlewareAlias ;
11+ use React \Http \HttpServer ;
12+ use React \Socket \SocketServer ;
1413use React \Stream \CompositeStream ;
15- use React \Stream \ReadableStreamInterface ;
1614use React \Stream \ThroughStream ;
1715use Rx \Disposable \CallbackDisposable ;
16+ use Rx \Disposable \EmptyDisposable ;
1817use Rx \DisposableInterface ;
1918use Rx \Observable ;
2019use Rx \Observable \AnonymousObservable ;
2120use Rx \Observer \CallbackObserver ;
2221use Rx \ObserverInterface ;
22+ use function RingCentral \Psr7 \str ;
2323
2424class Server extends Observable
2525{
@@ -41,7 +41,7 @@ public function __construct(string $bindAddressOrPort, bool $useMessageObject =
4141
4242 public function _subscribe (ObserverInterface $ observer ): DisposableInterface
4343 {
44- $ socket = new SocketServer ($ this ->bindAddress , $ this ->loop );
44+ $ socket = new SocketServer ($ this ->bindAddress , [], $ this ->loop );
4545
4646 $ negotiator = new ServerNegotiator (new RequestVerifier ());
4747 if (!empty ($ this ->subProtocols )) {
@@ -50,15 +50,14 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
5050
5151 $ http = new HttpServer (
5252 $ this ->loop ,
53- new StreamingRequestMiddleware (),
53+ new StreamingRequestMiddlewareAlias (),
5454 function (ServerRequestInterface $ request ) use ($ negotiator , $ observer ) {
5555 // cram the remote address into the header in our own X- header so
5656 // the user will have access to it
5757 $ request = $ request ->withAddedHeader ('X-RxWebsocket-Remote-Address ' , $ request ->getServerParams ()['REMOTE_ADDR ' ] ?? '' );
5858
5959 $ negotiatorResponse = $ negotiator ->handshake ($ request );
6060
61- /** @var ReadableStreamInterface $requestStream */
6261 $ requestStream = new ThroughStream ();
6362 $ responseStream = new ThroughStream ();
6463
@@ -74,8 +73,8 @@ function (ServerRequestInterface $request) use ($negotiator, $observer) {
7473 );
7574
7675 if ($ negotiatorResponse ->getStatusCode () !== 101 ) {
77- $ responseStream ->close ( );
78- return ;
76+ $ responseStream ->end ( str ( $ negotiatorResponse ) );
77+ return new EmptyDisposable () ;
7978 }
8079
8180 $ subProtocol = "" ;
@@ -87,7 +86,6 @@ function (ServerRequestInterface $request) use ($negotiator, $observer) {
8786 new AnonymousObservable (
8887 function (ObserverInterface $ observer ) use ($ requestStream ) {
8988 $ requestStream ->on ('data ' , function ($ data ) use ($ observer ) {
90- var_export ($ data );
9189 $ observer ->onNext ($ data );
9290 });
9391 $ requestStream ->on ('error ' , function ($ error ) use ($ observer ) {
@@ -115,7 +113,7 @@ function ($error) use ($responseStream) {
115113 $ responseStream ->close ();
116114 },
117115 function () use ($ responseStream ) {
118- $ responseStream ->close ();
116+ $ responseStream ->end ();
119117 }
120118 ),
121119 false ,
0 commit comments