@@ -33,6 +33,8 @@ public class ApolloSubscriptionProtocolHandler extends SubscriptionProtocolHandl
3333 private static final CloseReason TERMINATE_CLOSE_REASON = new CloseReason (CloseReason .CloseCodes .NORMAL_CLOSURE , "client requested " + GQL_CONNECTION_TERMINATE .getType ());
3434
3535 private final SubscriptionHandlerInput input ;
36+ private final SubscriptionSender sender ;
37+ private final ApolloSubscriptionKeepAliveRunner keepAliveRunner ;
3638 private final ApolloSubscriptionConnectionListener connectionListener ;
3739
3840 public ApolloSubscriptionProtocolHandler (SubscriptionHandlerInput subscriptionHandlerInput ) {
@@ -41,6 +43,8 @@ public ApolloSubscriptionProtocolHandler(SubscriptionHandlerInput subscriptionHa
4143 .filter (ApolloSubscriptionConnectionListener .class ::isInstance )
4244 .map (ApolloSubscriptionConnectionListener .class ::cast )
4345 .orElse (new ApolloSubscriptionConnectionListener () {});
46+ this .sender = new SubscriptionSender (this .input .getGraphQLObjectMapper ().getJacksonMapper ());
47+ this .keepAliveRunner = new ApolloSubscriptionKeepAliveRunner (this .sender );
4448 }
4549
4650 @ Override
@@ -54,20 +58,20 @@ public void onMessage(HandshakeRequest request, Session session, WsSessionSubscr
5458 return ;
5559 }
5660
57- switch (message .getType ()) {
61+ switch (message .getType ()) {
5862 case GQL_CONNECTION_INIT :
5963 try {
6064 Optional <Object > connectionResponse = connectionListener .onConnect (message .getPayload ());
6165 connectionResponse .ifPresent (it -> session .getUserProperties ().put (ApolloSubscriptionConnectionListener .CONNECT_RESULT_KEY , it ));
6266 } catch (Throwable t ) {
63- sendMessage (session , OperationMessage .Type .GQL_CONNECTION_ERROR , t . getMessage () );
67+ sendMessage (session , OperationMessage .Type .GQL_CONNECTION_ERROR , message . getId (), t );
6468 return ;
6569 }
6670
6771 sendMessage (session , OperationMessage .Type .GQL_CONNECTION_ACK , message .getId ());
6872
6973 if (connectionListener .isKeepAliveEnabled ()) {
70- sendMessage ( session , OperationMessage . Type . GQL_CONNECTION_KEEP_ALIVE , message . getId () );
74+ keepAliveRunner . keepAlive ( session );
7175 }
7276 break ;
7377
@@ -82,10 +86,12 @@ public void onMessage(HandshakeRequest request, Session session, WsSessionSubscr
8286 break ;
8387
8488 case GQL_STOP :
89+ keepAliveRunner .abort (session );
8590 unsubscribe (subscriptions , message .id );
8691 break ;
8792
8893 case GQL_CONNECTION_TERMINATE :
94+ keepAliveRunner .abort (session );
8995 try {
9096 session .close (TERMINATE_CLOSE_REASON );
9197 } catch (IOException e ) {
@@ -112,7 +118,7 @@ private GraphQLSingleInvocationInput createInvocationInput(Session session, Oper
112118 private void handleSubscriptionStart (Session session , WsSessionSubscriptions subscriptions , String id , ExecutionResult executionResult ) {
113119 executionResult = input .getGraphQLObjectMapper ().sanitizeErrors (executionResult );
114120
115- if (input .getGraphQLObjectMapper ().areErrorsPresent (executionResult )) {
121+ if (input .getGraphQLObjectMapper ().areErrorsPresent (executionResult )) {
116122 sendMessage (session , OperationMessage .Type .GQL_ERROR , id , input .getGraphQLObjectMapper ().convertSanitizedExecutionResult (executionResult , false ));
117123 return ;
118124 }
@@ -127,11 +133,13 @@ protected void sendDataMessage(Session session, String id, Object payload) {
127133
128134 @ Override
129135 protected void sendErrorMessage (Session session , String id ) {
136+ keepAliveRunner .abort (session );
130137 sendMessage (session , GQL_ERROR , id );
131138 }
132139
133140 @ Override
134141 protected void sendCompleteMessage (Session session , String id ) {
142+ keepAliveRunner .abort (session );
135143 sendMessage (session , GQL_COMPLETE , id );
136144 }
137145
@@ -140,13 +148,7 @@ private void sendMessage(Session session, OperationMessage.Type type, String id)
140148 }
141149
142150 private void sendMessage (Session session , OperationMessage .Type type , String id , Object payload ) {
143- try {
144- session .getBasicRemote ().sendText (input .getGraphQLObjectMapper ().getJacksonMapper ().writeValueAsString (
145- new OperationMessage (type , id , payload )
146- ));
147- } catch (IOException e ) {
148- throw new RuntimeException ("Error sending subscription response" , e );
149- }
151+ sender .send (session , new OperationMessage (type , id , payload ));
150152 }
151153
152154 @ JsonInclude (JsonInclude .Include .NON_NULL )
@@ -164,6 +166,10 @@ public OperationMessage(Type type, String id, Object payload) {
164166 this .payload = payload ;
165167 }
166168
169+ static OperationMessage newKeepAliveMessage () {
170+ return new OperationMessage (Type .GQL_CONNECTION_KEEP_ALIVE , null , null );
171+ }
172+
167173 public Type getType () {
168174 return type ;
169175 }
0 commit comments