33import com .google .common .io .ByteStreams ;
44import com .google .common .io .CharStreams ;
55import graphql .ExecutionResult ;
6+ import graphql .GraphQL ;
7+ import graphql .execution .reactive .SingleSubscriberPublisher ;
68import graphql .introspection .IntrospectionQuery ;
79import graphql .schema .GraphQLFieldDefinition ;
810import graphql .servlet .config .GraphQLConfiguration ;
1315import graphql .servlet .core .GraphQLServletListener ;
1416import graphql .servlet .core .internal .GraphQLRequest ;
1517import graphql .servlet .core .internal .VariableMapper ;
16- import graphql .servlet .input .BatchInputPreProcessResult ;
17- import graphql .servlet .input .BatchInputPreProcessor ;
18- import graphql .servlet .input .GraphQLBatchedInvocationInput ;
19- import graphql .servlet .input .GraphQLInvocationInputFactory ;
20- import graphql .servlet .input .GraphQLSingleInvocationInput ;
18+ import graphql .servlet .input .*;
2119import org .reactivestreams .Publisher ;
2220import org .reactivestreams .Subscriber ;
2321import org .reactivestreams .Subscription ;
2826import javax .servlet .AsyncEvent ;
2927import javax .servlet .AsyncListener ;
3028import javax .servlet .Servlet ;
31- import javax .servlet .ServletException ;
3229import javax .servlet .http .HttpServlet ;
3330import javax .servlet .http .HttpServletRequest ;
3431import javax .servlet .http .HttpServletResponse ;
3532import javax .servlet .http .Part ;
36- import java .io .BufferedInputStream ;
37- import java .io .ByteArrayOutputStream ;
38- import java .io .IOException ;
39- import java .io .InputStream ;
40- import java .io .Writer ;
41- import java .util .ArrayList ;
42- import java .util .Arrays ;
43- import java .util .HashMap ;
44- import java .util .Iterator ;
45- import java .util .List ;
46- import java .util .Map ;
47- import java .util .Objects ;
48- import java .util .Optional ;
33+ import java .io .*;
34+ import java .util .*;
4935import java .util .concurrent .CountDownLatch ;
5036import java .util .concurrent .atomic .AtomicReference ;
5137import java .util .function .BiConsumer ;
@@ -354,13 +340,13 @@ private void doRequest(HttpServletRequest request, HttpServletResponse response,
354340 }
355341
356342 @ Override
357- protected void doGet (HttpServletRequest req , HttpServletResponse resp ) throws ServletException , IOException {
343+ protected void doGet (HttpServletRequest req , HttpServletResponse resp ) {
358344 init ();
359345 doRequestAsync (req , resp , getHandler );
360346 }
361347
362348 @ Override
363- protected void doPost (HttpServletRequest req , HttpServletResponse resp ) throws ServletException , IOException {
349+ protected void doPost (HttpServletRequest req , HttpServletResponse resp ) {
364350 init ();
365351 doRequestAsync (req , resp , postHandler );
366352 }
@@ -373,7 +359,9 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
373359 HttpServletRequest req , HttpServletResponse resp ) throws IOException {
374360 ExecutionResult result = queryInvoker .query (invocationInput );
375361
376- if (!(result .getData () instanceof Publisher )) {
362+ boolean isDeferred = Objects .nonNull (result .getExtensions ()) && result .getExtensions ().containsKey (GraphQL .DEFERRED_RESULTS );
363+
364+ if (!(result .getData () instanceof Publisher || isDeferred )) {
377365 resp .setContentType (APPLICATION_JSON_UTF8 );
378366 resp .setStatus (STATUS_OK );
379367 resp .getWriter ().write (graphQLObjectMapper .serializeResultAsJson (result ));
@@ -390,7 +378,16 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
390378 AtomicReference <Subscription > subscriptionRef = new AtomicReference <>();
391379 asyncContext .addListener (new SubscriptionAsyncListener (subscriptionRef ));
392380 ExecutionResultSubscriber subscriber = new ExecutionResultSubscriber (subscriptionRef , asyncContext , graphQLObjectMapper );
393- ((Publisher <ExecutionResult >) result .getData ()).subscribe (subscriber );
381+ List <Publisher <ExecutionResult >> publishers = new ArrayList <>();
382+ if (result .getData () instanceof Publisher ) {
383+ publishers .add (result .getData ());
384+ } else {
385+ publishers .add (new StaticDataPublisher <>(result ));
386+ final Publisher <ExecutionResult > deferredResultsPublisher = (Publisher <ExecutionResult >) result .getExtensions ().get (GraphQL .DEFERRED_RESULTS );
387+ publishers .add (deferredResultsPublisher );
388+ }
389+ publishers .forEach (it -> it .subscribe (subscriber ));
390+
394391 if (isInAsyncThread ) {
395392 // We need to delay the completion of async context until after the subscription has terminated, otherwise the AsyncContext is prematurely closed.
396393 try {
@@ -537,7 +534,6 @@ public void onStartAsync(AsyncEvent event) {
537534 }
538535 }
539536
540-
541537 private static class ExecutionResultSubscriber implements Subscriber <ExecutionResult > {
542538
543539 private final AtomicReference <Subscription > subscriptionRef ;
@@ -584,4 +580,13 @@ public void await() throws InterruptedException {
584580 completedLatch .await ();
585581 }
586582 }
583+
584+ private static class StaticDataPublisher <T > extends SingleSubscriberPublisher <T > implements Publisher <T > {
585+ StaticDataPublisher (T data ) {
586+ super ();
587+ super .offer (data );
588+ super .noMoreData ();
589+ }
590+ }
591+
587592}
0 commit comments