@@ -3,6 +3,7 @@ use std::iter;
33use std:: result:: Result ;
44use std:: time:: { Duration , Instant } ;
55
6+ use graph:: components:: store:: UnitStream ;
67use graph:: { components:: store:: SubscriptionManager , prelude:: * } ;
78
89use crate :: runner:: ResultSizeMetrics ;
@@ -40,7 +41,7 @@ pub struct SubscriptionExecutionOptions {
4041 pub result_size : Arc < ResultSizeMetrics > ,
4142}
4243
43- pub async fn execute_subscription (
44+ pub fn execute_subscription (
4445 subscription : Subscription ,
4546 schema : Arc < ApiSchema > ,
4647 options : SubscriptionExecutionOptions ,
@@ -53,10 +54,10 @@ pub async fn execute_subscription(
5354 options. max_complexity ,
5455 options. max_depth ,
5556 ) ?;
56- execute_prepared_subscription ( query, options) . await
57+ execute_prepared_subscription ( query, options)
5758}
5859
59- pub ( crate ) async fn execute_prepared_subscription (
60+ pub ( crate ) fn execute_prepared_subscription (
6061 query : Arc < crate :: execution:: Query > ,
6162 options : SubscriptionExecutionOptions ,
6263) -> Result < SubscriptionResult , SubscriptionError > {
@@ -72,15 +73,15 @@ pub(crate) async fn execute_prepared_subscription(
7273 "query" => & query. query_text,
7374 ) ;
7475
75- let source_stream = create_source_event_stream ( query. clone ( ) , & options) . await ?;
76+ let source_stream = create_source_event_stream ( query. clone ( ) , & options) ?;
7677 let response_stream = map_source_to_response_stream ( query, options, source_stream) ;
7778 Ok ( response_stream)
7879}
7980
80- async fn create_source_event_stream (
81+ fn create_source_event_stream (
8182 query : Arc < crate :: execution:: Query > ,
8283 options : & SubscriptionExecutionOptions ,
83- ) -> Result < StoreEventStreamBox , SubscriptionError > {
84+ ) -> Result < UnitStream , SubscriptionError > {
8485 let resolver = StoreResolver :: for_subscription (
8586 & options. logger ,
8687 query. schema . id ( ) . clone ( ) ,
@@ -123,35 +124,31 @@ async fn create_source_event_stream(
123124 let field = fields. 1 [ 0 ] ;
124125 let argument_values = coerce_argument_values ( & ctx. query , subscription_type. as_ref ( ) , field) ?;
125126
126- resolve_field_stream ( & ctx, & subscription_type, field, argument_values) . await
127+ resolve_field_stream ( & ctx, & subscription_type, field, argument_values)
127128}
128129
129- async fn resolve_field_stream (
130+ fn resolve_field_stream (
130131 ctx : & ExecutionContext < impl Resolver > ,
131132 object_type : & s:: ObjectType ,
132133 field : & q:: Field ,
133134 _argument_values : HashMap < & str , r:: Value > ,
134- ) -> Result < StoreEventStreamBox , SubscriptionError > {
135+ ) -> Result < UnitStream , SubscriptionError > {
135136 ctx. resolver
136137 . resolve_field_stream ( & ctx. query . schema . document ( ) , object_type, field)
137- . await
138138 . map_err ( SubscriptionError :: from)
139139}
140140
141141fn map_source_to_response_stream (
142142 query : Arc < crate :: execution:: Query > ,
143143 options : SubscriptionExecutionOptions ,
144- source_stream : StoreEventStreamBox ,
144+ source_stream : UnitStream ,
145145) -> QueryResultStream {
146146 // Create a stream with a single empty event. By chaining this in front
147147 // of the real events, we trick the subscription into executing its query
148148 // at least once. This satisfies the GraphQL over Websocket protocol
149149 // requirement of "respond[ing] with at least one GQL_DATA message", see
150150 // https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_data
151- let trigger_stream = futures03:: stream:: iter ( vec ! [ Ok ( Arc :: new( StoreEvent {
152- tag: 0 ,
153- changes: Default :: default ( ) ,
154- } ) ) ] ) ;
151+ let trigger_stream = futures03:: stream:: once ( async { } ) ;
155152
156153 let SubscriptionExecutionOptions {
157154 logger,
@@ -165,43 +162,34 @@ fn map_source_to_response_stream(
165162 result_size,
166163 } = options;
167164
168- Box :: new (
169- trigger_stream
170- . chain ( source_stream. compat ( ) )
171- . then ( move |res| match res {
172- Err ( ( ) ) => {
173- futures03:: future:: ready ( Arc :: new ( QueryExecutionError :: EventStreamError . into ( ) ) )
174- . boxed ( )
175- }
176- Ok ( event) => execute_subscription_event (
177- logger. clone ( ) ,
178- store. clone ( ) ,
179- subscription_manager. cheap_clone ( ) ,
180- query. clone ( ) ,
181- event,
182- timeout,
183- max_first,
184- max_skip,
185- result_size. cheap_clone ( ) ,
186- )
187- . boxed ( ) ,
188- } ) ,
189- )
165+ trigger_stream
166+ . chain ( source_stream)
167+ . then ( move |( ) | {
168+ execute_subscription_event (
169+ logger. clone ( ) ,
170+ store. clone ( ) ,
171+ subscription_manager. cheap_clone ( ) ,
172+ query. clone ( ) ,
173+ timeout,
174+ max_first,
175+ max_skip,
176+ result_size. cheap_clone ( ) ,
177+ )
178+ . boxed ( )
179+ } )
180+ . boxed ( )
190181}
191182
192183async fn execute_subscription_event (
193184 logger : Logger ,
194185 store : Arc < dyn QueryStore > ,
195186 subscription_manager : Arc < dyn SubscriptionManager > ,
196187 query : Arc < crate :: execution:: Query > ,
197- event : Arc < StoreEvent > ,
198188 timeout : Option < Duration > ,
199189 max_first : u32 ,
200190 max_skip : u32 ,
201191 result_size : Arc < ResultSizeMetrics > ,
202192) -> Arc < QueryResult > {
203- debug ! ( logger, "Execute subscription event" ; "event" => format!( "{:?}" , event) ) ;
204-
205193 let resolver = match StoreResolver :: at_block (
206194 & logger,
207195 store,
0 commit comments