1616
1717package io .rsocket .internal ;
1818
19+ import static org .assertj .core .api .Assertions .assertThat ;
1920import static org .junit .Assert .assertEquals ;
2021
2122import io .netty .buffer .ByteBuf ;
2223import io .netty .buffer .ByteBufAllocator ;
2324import io .netty .buffer .Unpooled ;
2425import io .rsocket .buffer .LeaksTrackingByteBufAllocator ;
25- import io .rsocket .frame .*;
26+ import io .rsocket .frame .ErrorFrameCodec ;
27+ import io .rsocket .frame .KeepAliveFrameCodec ;
28+ import io .rsocket .frame .LeaseFrameCodec ;
29+ import io .rsocket .frame .MetadataPushFrameCodec ;
30+ import io .rsocket .frame .ResumeFrameCodec ;
31+ import io .rsocket .frame .ResumeOkFrameCodec ;
32+ import io .rsocket .frame .SetupFrameCodec ;
2633import io .rsocket .plugins .InitializingInterceptorRegistry ;
2734import io .rsocket .test .util .TestDuplexConnection ;
2835import io .rsocket .util .DefaultPayload ;
2936import java .util .concurrent .atomic .AtomicInteger ;
37+ import java .util .concurrent .atomic .AtomicReference ;
3038import org .junit .Before ;
3139import org .junit .Test ;
3240
@@ -68,44 +76,44 @@ public void clientSplits() {
6876 .doOnNext (f -> setupFrames .incrementAndGet ())
6977 .subscribe ();
7078
79+ source .addToReceivedBuffer (setupFrame ());
80+ assertEquals (0 , clientFrames .get ());
81+ assertEquals (0 , serverFrames .get ());
82+ assertEquals (1 , setupFrames .get ());
83+
7184 source .addToReceivedBuffer (errorFrame (1 ));
7285 assertEquals (1 , clientFrames .get ());
7386 assertEquals (0 , serverFrames .get ());
74- assertEquals (0 , setupFrames .get ());
87+ assertEquals (1 , setupFrames .get ());
7588
7689 source .addToReceivedBuffer (errorFrame (1 ));
7790 assertEquals (2 , clientFrames .get ());
7891 assertEquals (0 , serverFrames .get ());
79- assertEquals (0 , setupFrames .get ());
92+ assertEquals (1 , setupFrames .get ());
8093
8194 source .addToReceivedBuffer (leaseFrame ());
8295 assertEquals (3 , clientFrames .get ());
8396 assertEquals (0 , serverFrames .get ());
84- assertEquals (0 , setupFrames .get ());
97+ assertEquals (1 , setupFrames .get ());
8598
8699 source .addToReceivedBuffer (keepAliveFrame ());
87100 assertEquals (4 , clientFrames .get ());
88101 assertEquals (0 , serverFrames .get ());
89- assertEquals (0 , setupFrames .get ());
102+ assertEquals (1 , setupFrames .get ());
90103
91104 source .addToReceivedBuffer (errorFrame (2 ));
92105 assertEquals (4 , clientFrames .get ());
93106 assertEquals (1 , serverFrames .get ());
94- assertEquals (0 , setupFrames .get ());
107+ assertEquals (1 , setupFrames .get ());
95108
96109 source .addToReceivedBuffer (errorFrame (0 ));
97110 assertEquals (5 , clientFrames .get ());
98111 assertEquals (1 , serverFrames .get ());
99- assertEquals (0 , setupFrames .get ());
112+ assertEquals (1 , setupFrames .get ());
100113
101114 source .addToReceivedBuffer (metadataPushFrame ());
102115 assertEquals (5 , clientFrames .get ());
103116 assertEquals (2 , serverFrames .get ());
104- assertEquals (0 , setupFrames .get ());
105-
106- source .addToReceivedBuffer (setupFrame ());
107- assertEquals (5 , clientFrames .get ());
108- assertEquals (2 , serverFrames .get ());
109117 assertEquals (1 , setupFrames .get ());
110118
111119 source .addToReceivedBuffer (resumeFrame ());
@@ -141,44 +149,44 @@ public void serverSplits() {
141149 .doOnNext (f -> setupFrames .incrementAndGet ())
142150 .subscribe ();
143151
152+ source .addToReceivedBuffer (setupFrame ());
153+ assertEquals (0 , clientFrames .get ());
154+ assertEquals (0 , serverFrames .get ());
155+ assertEquals (1 , setupFrames .get ());
156+
144157 source .addToReceivedBuffer (errorFrame (1 ));
145158 assertEquals (1 , clientFrames .get ());
146159 assertEquals (0 , serverFrames .get ());
147- assertEquals (0 , setupFrames .get ());
160+ assertEquals (1 , setupFrames .get ());
148161
149162 source .addToReceivedBuffer (errorFrame (1 ));
150163 assertEquals (2 , clientFrames .get ());
151164 assertEquals (0 , serverFrames .get ());
152- assertEquals (0 , setupFrames .get ());
165+ assertEquals (1 , setupFrames .get ());
153166
154167 source .addToReceivedBuffer (leaseFrame ());
155168 assertEquals (2 , clientFrames .get ());
156169 assertEquals (1 , serverFrames .get ());
157- assertEquals (0 , setupFrames .get ());
170+ assertEquals (1 , setupFrames .get ());
158171
159172 source .addToReceivedBuffer (keepAliveFrame ());
160173 assertEquals (2 , clientFrames .get ());
161174 assertEquals (2 , serverFrames .get ());
162- assertEquals (0 , setupFrames .get ());
175+ assertEquals (1 , setupFrames .get ());
163176
164177 source .addToReceivedBuffer (errorFrame (2 ));
165178 assertEquals (2 , clientFrames .get ());
166179 assertEquals (3 , serverFrames .get ());
167- assertEquals (0 , setupFrames .get ());
180+ assertEquals (1 , setupFrames .get ());
168181
169182 source .addToReceivedBuffer (errorFrame (0 ));
170183 assertEquals (2 , clientFrames .get ());
171184 assertEquals (4 , serverFrames .get ());
172- assertEquals (0 , setupFrames .get ());
185+ assertEquals (1 , setupFrames .get ());
173186
174187 source .addToReceivedBuffer (metadataPushFrame ());
175188 assertEquals (3 , clientFrames .get ());
176189 assertEquals (4 , serverFrames .get ());
177- assertEquals (0 , setupFrames .get ());
178-
179- source .addToReceivedBuffer (setupFrame ());
180- assertEquals (3 , clientFrames .get ());
181- assertEquals (4 , serverFrames .get ());
182190 assertEquals (1 , setupFrames .get ());
183191
184192 source .addToReceivedBuffer (resumeFrame ());
@@ -192,6 +200,43 @@ public void serverSplits() {
192200 assertEquals (3 , setupFrames .get ());
193201 }
194202
203+ @ Test
204+ public void unexpectedFramesBeforeSetupFrame () {
205+ AtomicInteger clientFrames = new AtomicInteger ();
206+ AtomicInteger serverFrames = new AtomicInteger ();
207+ AtomicInteger setupFrames = new AtomicInteger ();
208+
209+ AtomicReference <Throwable > clientError = new AtomicReference <>();
210+ AtomicReference <Throwable > serverError = new AtomicReference <>();
211+ AtomicReference <Throwable > setupError = new AtomicReference <>();
212+
213+ serverMultiplexer
214+ .asClientConnection ()
215+ .receive ()
216+ .subscribe (bb -> clientFrames .incrementAndGet (), clientError ::set );
217+ serverMultiplexer
218+ .asServerConnection ()
219+ .receive ()
220+ .subscribe (bb -> serverFrames .incrementAndGet (), serverError ::set );
221+ serverMultiplexer
222+ .asSetupConnection ()
223+ .receive ()
224+ .subscribe (bb -> setupFrames .incrementAndGet (), setupError ::set );
225+
226+ source .addToReceivedBuffer (keepAliveFrame ());
227+
228+ assertThat (clientError .get ().getMessage ())
229+ .isEqualTo ("SETUP or LEASE frame must be received before any others." );
230+ assertThat (serverError .get ().getMessage ())
231+ .isEqualTo ("SETUP or LEASE frame must be received before any others." );
232+ assertThat (setupError .get ().getMessage ())
233+ .isEqualTo ("SETUP or LEASE frame must be received before any others." );
234+
235+ assertEquals (0 , clientFrames .get ());
236+ assertEquals (0 , serverFrames .get ());
237+ assertEquals (0 , setupFrames .get ());
238+ }
239+
195240 private ByteBuf resumeFrame () {
196241 return ResumeFrameCodec .encode (allocator , Unpooled .EMPTY_BUFFER , 0 , 0 );
197242 }
0 commit comments