22
33namespace Rx \Websocket ;
44
5- use GuzzleHttp \Psr7 \Request ;
65use Psr \Http \Message \ServerRequestInterface ;
76use Ratchet \RFC6455 \Handshake \RequestVerifier ;
87use Ratchet \RFC6455 \Handshake \ServerNegotiator ;
98use React \EventLoop \LoopInterface ;
10- use React \Http \Response ;
11- use React \Http \Server as HttpServer ;
12- use React \Socket \Server as SocketServer ;
9+ use React \Http \Message \Response ;
10+ use React \Http \Middleware \StreamingRequestMiddleware as StreamingRequestMiddlewareAlias ;
11+ use React \Http \HttpServer ;
12+ use React \Socket \SocketServer ;
1313use React \Stream \CompositeStream ;
14- use React \Stream \ReadableStreamInterface ;
1514use React \Stream \ThroughStream ;
1615use Rx \Disposable \CallbackDisposable ;
16+ use Rx \Disposable \EmptyDisposable ;
1717use Rx \DisposableInterface ;
1818use Rx \Observable ;
1919use Rx \Observable \AnonymousObservable ;
2020use Rx \Observer \CallbackObserver ;
2121use Rx \ObserverInterface ;
22+ use function RingCentral \Psr7 \str ;
2223
2324class Server extends Observable
2425{
@@ -40,100 +41,94 @@ public function __construct(string $bindAddressOrPort, bool $useMessageObject =
4041
4142 public function _subscribe (ObserverInterface $ observer ): DisposableInterface
4243 {
43- $ socket = new SocketServer ($ this ->bindAddress , $ this ->loop );
44+ $ socket = new SocketServer ($ this ->bindAddress , [], $ this ->loop );
4445
4546 $ negotiator = new ServerNegotiator (new RequestVerifier ());
4647 if (!empty ($ this ->subProtocols )) {
4748 $ negotiator ->setSupportedSubProtocols ($ this ->subProtocols );
4849 }
4950
50- $ http = new HttpServer (function (ServerRequestInterface $ request ) use ($ negotiator , $ observer ) {
51- $ uri = $ request ->getUri ();
52-
53- $ psrRequest = new Request (
54- $ request ->getMethod (),
55- $ uri ,
56- $ request ->getHeaders ()
57- );
58-
59- // cram the remote address into the header in our own X- header so
60- // the user will have access to it
61- $ psrRequest = $ psrRequest ->withAddedHeader ('X-RxWebsocket-Remote-Address ' , $ request ->getServerParams ()['REMOTE_ADDR ' ] ?? '' );
62-
63- $ negotiatorResponse = $ negotiator ->handshake ($ psrRequest );
64-
65- /** @var ReadableStreamInterface $requestStream */
66- $ requestStream = new ThroughStream ();
67- $ responseStream = new ThroughStream ();
68-
69- $ response = new Response (
70- $ negotiatorResponse ->getStatusCode (),
71- array_merge (
72- $ negotiatorResponse ->getHeaders ()
73- ),
74- new CompositeStream (
75- $ responseStream ,
76- $ requestStream
77- )
78- );
79-
80-
81- if ($ negotiatorResponse ->getStatusCode () !== 101 ) {
82- $ responseStream ->end ();
83- return ;
84- }
85-
86- $ subProtocol = "" ;
87- if (count ($ negotiatorResponse ->getHeader ('Sec-WebSocket-Protocol ' )) > 0 ) {
88- $ subProtocol = $ negotiatorResponse ->getHeader ('Sec-WebSocket-Protocol ' )[0 ];
51+ $ http = new HttpServer (
52+ $ this ->loop ,
53+ new StreamingRequestMiddlewareAlias (),
54+ function (ServerRequestInterface $ request ) use ($ negotiator , $ observer ) {
55+ // cram the remote address into the header in our own X- header so
56+ // the user will have access to it
57+ $ request = $ request ->withAddedHeader ('X-RxWebsocket-Remote-Address ' , $ request ->getServerParams ()['REMOTE_ADDR ' ] ?? '' );
58+
59+ $ negotiatorResponse = $ negotiator ->handshake ($ request );
60+
61+ $ requestStream = new ThroughStream ();
62+ $ responseStream = new ThroughStream ();
63+
64+ $ response = new Response (
65+ $ negotiatorResponse ->getStatusCode (),
66+ array_merge (
67+ $ negotiatorResponse ->getHeaders ()
68+ ),
69+ new CompositeStream (
70+ $ responseStream ,
71+ $ requestStream
72+ )
73+ );
74+
75+ if ($ negotiatorResponse ->getStatusCode () !== 101 ) {
76+ $ responseStream ->end (str ($ negotiatorResponse ));
77+ return new EmptyDisposable ();
78+ }
79+
80+ $ subProtocol = "" ;
81+ if (count ($ negotiatorResponse ->getHeader ('Sec-WebSocket-Protocol ' )) > 0 ) {
82+ $ subProtocol = $ negotiatorResponse ->getHeader ('Sec-WebSocket-Protocol ' )[0 ];
83+ }
84+
85+ $ messageSubject = new MessageSubject (
86+ new AnonymousObservable (
87+ function (ObserverInterface $ observer ) use ($ requestStream ) {
88+ $ requestStream ->on ('data ' , function ($ data ) use ($ observer ) {
89+ $ observer ->onNext ($ data );
90+ });
91+ $ requestStream ->on ('error ' , function ($ error ) use ($ observer ) {
92+ $ observer ->onError ($ error );
93+ });
94+ $ requestStream ->on ('close ' , function () use ($ observer ) {
95+ $ observer ->onCompleted ();
96+ });
97+ $ requestStream ->on ('end ' , function () use ($ observer ) {
98+ $ observer ->onCompleted ();
99+ });
100+
101+ return new CallbackDisposable (
102+ function () use ($ requestStream ) {
103+ $ requestStream ->close ();
104+ }
105+ );
106+ }
107+ ),
108+ new CallbackObserver (
109+ function ($ x ) use ($ responseStream ) {
110+ $ responseStream ->write ($ x );
111+ },
112+ function ($ error ) use ($ responseStream ) {
113+ $ responseStream ->close ();
114+ },
115+ function () use ($ responseStream ) {
116+ $ responseStream ->end ();
117+ }
118+ ),
119+ false ,
120+ $ this ->useMessageObject ,
121+ $ subProtocol ,
122+ $ request ,
123+ $ negotiatorResponse ,
124+ $ this ->keepAlive
125+ );
126+
127+ $ observer ->onNext ($ messageSubject );
128+
129+ return $ response ;
89130 }
90-
91- $ messageSubject = new MessageSubject (
92- new AnonymousObservable (
93- function (ObserverInterface $ observer ) use ($ requestStream ) {
94- $ requestStream ->on ('data ' , function ($ data ) use ($ observer ) {
95- $ observer ->onNext ($ data );
96- });
97- $ requestStream ->on ('error ' , function ($ error ) use ($ observer ) {
98- $ observer ->onError ($ error );
99- });
100- $ requestStream ->on ('close ' , function () use ($ observer ) {
101- $ observer ->onCompleted ();
102- });
103- $ requestStream ->on ('end ' , function () use ($ observer ) {
104- $ observer ->onCompleted ();
105- });
106-
107- return new CallbackDisposable (
108- function () use ($ requestStream ) {
109- $ requestStream ->close ();
110- }
111- );
112- }
113- ),
114- new CallbackObserver (
115- function ($ x ) use ($ responseStream ) {
116- $ responseStream ->write ($ x );
117- },
118- function ($ error ) use ($ responseStream ) {
119- $ responseStream ->close ();
120- },
121- function () use ($ responseStream ) {
122- $ responseStream ->end ();
123- }
124- ),
125- false ,
126- $ this ->useMessageObject ,
127- $ subProtocol ,
128- $ psrRequest ,
129- $ negotiatorResponse ,
130- $ this ->keepAlive
131- );
132-
133- $ observer ->onNext ($ messageSubject );
134-
135- return $ response ;
136- });
131+ );
137132
138133 $ http ->listen ($ socket );
139134
0 commit comments