55use GuzzleHttp \Psr7 \Request ;
66use GuzzleHttp \Psr7 \Response ;
77use Ratchet \RFC6455 \Messaging \Frame ;
8+ use Rx \Exception \TimeoutException ;
89use Rx \Observer \CallbackObserver ;
910use Rx \Subject \Subject ;
11+ use Rx \Testing \MockObserver ;
1012use Rx \Websocket \MessageSubject ;
1113use Rx \Websocket \WebsocketErrorException ;
1214
13- class MessageSubjectTest extends \PHPUnit_Framework_TestCase
15+ class MessageSubjectTest extends TestCase
1416{
1517 public function testCloseCodeSentToOnError ()
1618 {
@@ -50,4 +52,135 @@ function () use (&$closeCode) {
5052
5153 $ this ->assertEquals (4000 , $ closeCode );
5254 }
55+
56+ public function testPingPongTimeout ()
57+ {
58+ $ dataIn = $ this ->createHotObservable ([
59+ onNext (200 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
60+ onNext (205 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
61+ ]);
62+
63+ $ dataOut = new Subject ();
64+
65+ $ ms = new MessageSubject (
66+ $ dataIn ,
67+ $ dataOut ,
68+ true ,
69+ false ,
70+ '' ,
71+ new Request ('GET ' , '/ws ' ),
72+ new Response (),
73+ 300
74+ );
75+
76+ $ result = $ this ->scheduler ->startWithCreate (function () use ($ dataOut ) {
77+ return $ dataOut ;
78+ });
79+
80+ $ this ->assertMessages ([
81+ onNext (650 , (new Frame ('' , true , Frame::OP_PING ))->getContents ()),
82+ onError (950 , new TimeoutException ())
83+ ], $ result ->getMessages ());
84+ }
85+
86+ public function testPingPong ()
87+ {
88+ $ dataIn = $ this ->createHotObservable ([
89+ onNext (200 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
90+ onNext (205 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
91+ onNext (651 , (new Frame ('' , true , Frame::OP_PONG ))->getContents ())
92+ ]);
93+
94+ $ dataOut = new Subject ();
95+
96+ $ ms = new MessageSubject (
97+ $ dataIn ,
98+ $ dataOut ,
99+ true ,
100+ false ,
101+ '' ,
102+ new Request ('GET ' , '/ws ' ),
103+ new Response (),
104+ 300
105+ );
106+
107+ $ result = $ this ->scheduler ->startWithDispose (function () use ($ dataOut ) {
108+ return $ dataOut ;
109+ }, 2000 );
110+
111+ $ this ->assertMessages ([
112+ onNext (650 , (new Frame ('' , true , Frame::OP_PING ))->getContents ()),
113+ onNext (951 , (new Frame ('' , true , Frame::OP_PING ))->getContents ()),
114+ onError (1251 , new TimeoutException ())
115+ ], $ result ->getMessages ());
116+ }
117+
118+ public function testPingPongDataSuppressesPing ()
119+ {
120+ $ dataIn = $ this ->createHotObservable ([
121+ onNext (201 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
122+ onNext (205 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
123+ onNext (649 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ())
124+ ]);
125+
126+ $ dataOut = new Subject ();
127+
128+ $ ms = new MessageSubject (
129+ $ dataIn ,
130+ $ dataOut ,
131+ true ,
132+ false ,
133+ '' ,
134+ new Request ('GET ' , '/ws ' ),
135+ new Response (),
136+ 300
137+ );
138+
139+ $ result = $ this ->scheduler ->startWithDispose (function () use ($ dataOut ) {
140+ return $ dataOut ;
141+ }, 2000 );
142+
143+ $ this ->assertMessages ([
144+ onNext (949 , (new Frame ('' , true , Frame::OP_PING ))->getContents ()),
145+ onError (1249 , new TimeoutException ())
146+ ], $ result ->getMessages ());
147+ }
148+
149+ public function testDisposeOnMessageSubjectClosesConnection ()
150+ {
151+ $ dataIn = $ this ->createHotObservable ([
152+ onNext (201 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
153+ onNext (205 , (new Frame ('' , true , Frame::OP_TEXT ))->getContents ()),
154+ ]);
155+
156+ $ dataOut = new MockObserver ($ this ->scheduler );
157+
158+ $ ms = new MessageSubject (
159+ $ dataIn ,
160+ $ dataOut ,
161+ true ,
162+ false ,
163+ '' ,
164+ new Request ('GET ' , '/ws ' ),
165+ new Response (),
166+ 300
167+ );
168+
169+ $ result = $ this ->scheduler ->startWithDispose (function () use ($ ms ) {
170+ return $ ms ;
171+ }, 300 );
172+
173+ $ this ->assertMessages ([
174+ onNext (201 , '' ),
175+ onNext (205 , '' ),
176+ ], $ result ->getMessages ());
177+
178+ $ this ->assertSubscriptions ([
179+ subscribe (0 ,300 )
180+ ], $ dataIn ->getSubscriptions ());
181+
182+ $ this ->assertMessages ([
183+ onCompleted (300 )
184+ ], $ dataOut ->getMessages ());
185+ }
53186}
0 commit comments