11package com .graphql .spring .boot .test ;
22
33import static org .assertj .core .api .Assertions .assertThat ;
4+ import static org .awaitility .Awaitility .await ;
45import static org .junit .jupiter .api .Assertions .fail ;
56
67import com .fasterxml .jackson .core .JsonProcessingException ;
1819import java .util .Map ;
1920import java .util .Optional ;
2021import java .util .Queue ;
22+ import java .util .concurrent .TimeUnit ;
2123import java .util .concurrent .atomic .AtomicInteger ;
2224import java .util .function .Predicate ;
2325import javax .websocket .ClientEndpointConfig ;
@@ -49,7 +51,6 @@ public class GraphQLTestSubscription {
4951
5052 private static final WebSocketContainer WEB_SOCKET_CONTAINER = ContainerProvider
5153 .getWebSocketContainer ();
52- private static final int SLEEP_INTERVAL_MS = 100 ;
5354 private static final int ACKNOWLEDGEMENT_AND_CONNECTION_TIMEOUT = 60000 ;
5455 private static final AtomicInteger ID_COUNTER = new AtomicInteger (1 );
5556 private static final UriBuilderFactory URI_BUILDER_FACTORY = new DefaultUriBuilderFactory ();
@@ -115,7 +116,7 @@ public GraphQLTestSubscription init(@Nullable final Object payload) {
115116 sendMessage (message );
116117 state .setInitialized (true );
117118 awaitAcknowledgement ();
118- log .debug ("Subscription successfully initialized. " );
119+ log .debug ("Subscription successfully initialized" );
119120 return this ;
120121 }
121122
@@ -134,12 +135,12 @@ public GraphQLTestSubscription start(@NonNull final String graphQLResource) {
134135 /**
135136 * Sends the "start" message to the GraphQL Subscription.
136137 *
137- * @param graphGLResource the GraphQL resource, which contains the query for the subscription
138+ * @param graphQLResource the GraphQL resource, which contains the query for the subscription
138139 * start payload.
139140 * @param variables the variables needed for the query to be evaluated.
140141 * @return self reference
141142 */
142- public GraphQLTestSubscription start (@ NonNull final String graphGLResource ,
143+ public GraphQLTestSubscription start (@ NonNull final String graphQLResource ,
143144 @ Nullable final Object variables ) {
144145 if (!isInitialized ()) {
145146 init ();
@@ -149,7 +150,7 @@ public GraphQLTestSubscription start(@NonNull final String graphGLResource,
149150 }
150151 state .setStarted (true );
151152 ObjectNode payload = objectMapper .createObjectNode ();
152- payload .put ("query" , loadQuery (graphGLResource ));
153+ payload .put ("query" , loadQuery (graphQLResource ));
153154 payload .set ("variables" , getFinalPayload (variables ));
154155 ObjectNode message = objectMapper .createObjectNode ();
155156 message .put ("type" , "start" );
@@ -298,18 +299,11 @@ public List<GraphQLResponse> awaitAndGetNextResponses(
298299 if (isStopped ()) {
299300 fail ("Subscription already stopped. Forgot to call reset after test case?" );
300301 }
301- int elapsedTime = 0 ;
302- while (
303- ((state .getResponses ().size () < numExpectedResponses ) || numExpectedResponses <= 0 )
304- && elapsedTime < timeout
305- ) {
306- try {
307- Thread .sleep (SLEEP_INTERVAL_MS );
308- elapsedTime += SLEEP_INTERVAL_MS ;
309- } catch (InterruptedException e ) {
310- fail ("Test execution error - Thread.sleep failed." , e );
311- }
312- }
302+
303+ await ()
304+ .atMost (timeout , TimeUnit .MILLISECONDS )
305+ .until (() -> state .getResponses ().size () >= numExpectedResponses );
306+
313307 if (stopAfter ) {
314308 stop ();
315309 }
@@ -420,29 +414,21 @@ private void sendMessage(final Object message) {
420414 }
421415
422416 private void awaitAcknowledgement () {
423- await (GraphQLTestSubscription ::isAcknowledged ,
424- "Connection was not acknowledged by the GraphQL server." );
417+ awaitAcknowledgementOrConnection (GraphQLTestSubscription ::isAcknowledged ,
418+ "Connection was acknowledged by the GraphQL server." );
425419 }
426420
427421 private void awaitStop () {
428- await (GraphQLTestSubscription ::isStopped , "Connection was not stopped in time." );
422+ awaitAcknowledgementOrConnection (GraphQLTestSubscription ::isStopped ,
423+ "Connection was stopped in time." );
429424 }
430425
431- private void await (final Predicate <GraphQLTestSubscription > condition ,
426+ private void awaitAcknowledgementOrConnection (final Predicate <GraphQLTestSubscription > condition ,
432427 final String timeoutDescription ) {
433- int elapsedTime = 0 ;
434- while (!condition .test (this ) && elapsedTime < ACKNOWLEDGEMENT_AND_CONNECTION_TIMEOUT ) {
435- try {
436- Thread .sleep (SLEEP_INTERVAL_MS );
437- elapsedTime += SLEEP_INTERVAL_MS ;
438- } catch (InterruptedException e ) {
439- fail ("Test execution error - Thread.sleep failed." , e );
440- }
441- }
428+ await (timeoutDescription )
429+ .atMost (ACKNOWLEDGEMENT_AND_CONNECTION_TIMEOUT , TimeUnit .MILLISECONDS )
430+ .until (() -> condition .test (this ));
442431
443- if (!condition .test (this )) {
444- fail ("Timeout: " + timeoutDescription );
445- }
446432 }
447433
448434 @ RequiredArgsConstructor
@@ -460,28 +446,35 @@ public void onMessage(final String message) {
460446 assertThat (typeNode ).as ("GraphQL messages should have a type field." ).isNotNull ();
461447 assertThat (typeNode .isNull ()).as ("GraphQL messages type should not be null." ).isFalse ();
462448 final String type = typeNode .asText ();
463- if (type .equals ("complete" )) {
464- state .setCompleted (true );
465- log .debug ("Subscription completed." );
466- } else if (type .equals ("connection_ack" )) {
467- state .setAcknowledged (true );
468- log .debug ("WebSocket connection acknowledged by the GraphQL Server." );
469- } else if (type .equals ("data" ) || type .equals ("error" )) {
470- final JsonNode payload = jsonNode .get (PAYLOAD );
471- assertThat (payload ).as ("Data/error messages must have a payload." ).isNotNull ();
472- final String payloadString = objectMapper .writeValueAsString (payload );
473- final GraphQLResponse graphQLResponse = new GraphQLResponse (
474- ResponseEntity .ok (payloadString ),
475- objectMapper );
476- if (state .isStopped () || state .isCompleted ()) {
477- log .debug (
478- "Response discarded because subscription was stopped or completed in the meanwhile." );
479- } else {
480- synchronized (STATE_LOCK ) {
481- state .getResponses ().add (graphQLResponse );
449+ switch (type ) {
450+ case "complete" :
451+ state .setCompleted (true );
452+ log .debug ("Subscription completed." );
453+ break ;
454+ case "connection_ack" :
455+ state .setAcknowledged (true );
456+ log .debug ("WebSocket connection acknowledged by the GraphQL Server." );
457+ break ;
458+ case "data" :
459+ case "error" :
460+ final JsonNode payload = jsonNode .get (PAYLOAD );
461+ assertThat (payload ).as ("Data/error messages must have a payload." ).isNotNull ();
462+ final String payloadString = objectMapper .writeValueAsString (payload );
463+ final GraphQLResponse graphQLResponse = new GraphQLResponse (
464+ ResponseEntity .ok (payloadString ),
465+ objectMapper );
466+ if (state .isStopped () || state .isCompleted ()) {
467+ log .debug (
468+ "Response discarded because subscription was stopped or completed in the meanwhile." );
469+ } else {
470+ synchronized (STATE_LOCK ) {
471+ state .getResponses ().add (graphQLResponse );
472+ }
473+ log .debug ("New response recorded." );
482474 }
483- log .debug ("New response recorded." );
484- }
475+ break ;
476+ default :
477+ break ;
485478 }
486479 } catch (JsonProcessingException e ) {
487480 fail ("Exception while parsing server response. Response is not a valid GraphQL response." ,
0 commit comments