@@ -12,8 +12,6 @@ use graph::data::subgraph::Graft;
1212use graph:: data:: value:: Word ;
1313use graph:: futures03;
1414use graph:: futures03:: future:: TryFutureExt ;
15- use graph:: futures03:: stream;
16- use graph:: futures03:: stream:: TryStreamExt ;
1715use graph:: futures03:: Stream ;
1816use graph:: futures03:: StreamExt ;
1917use graph:: prelude:: {
@@ -90,68 +88,47 @@ where
9088 //
9189 // The discrepancy between the start time of the event stream and the table read can result
9290 // in some extraneous events on start up. Examples:
93- // - The event stream sees an Add event for subgraph A, but the table query finds that
91+ // - The event stream sees an 'set' event for subgraph A, but the table query finds that
9492 // subgraph A is already in the table.
95- // - The event stream sees a Remove event for subgraph B, but the table query finds that
93+ // - The event stream sees a 'removed' event for subgraph B, but the table query finds that
9694 // subgraph B has already been removed.
97- // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
98- // (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.
99-
100- fn panic_on_cancel (
101- logger : & Logger ,
102- e : CancelableError < SubgraphAssignmentProviderError > ,
103- ) -> ! {
104- match e {
105- CancelableError :: Cancel => {
106- panic ! ( "assignment event stream canceled" )
107- }
108- CancelableError :: Error ( e) => {
109- error ! ( logger, "Assignment event stream failed: {}" , e) ;
110- panic ! ( "assignment event stream failed: {}" , e) ;
111- }
112- }
113- }
95+ // The `change_assignment` function handles these cases by ignoring
96+ // such cases which makes the operations idempotent
11497
11598 // Start event stream
11699 let assignment_event_stream = self . cheap_clone ( ) . assignment_events ( ) . await ;
117100
118101 // Deploy named subgraphs found in store
119102 self . start_assigned_subgraphs ( ) . await ?;
120103
104+ let cancel_handle = self . assignment_event_stream_cancel_guard . handle ( ) ;
105+
121106 // Spawn a task to handle assignment events.
122- let assignment_event_stream_cancel_handle =
123- self . assignment_event_stream_cancel_guard . handle ( ) ;
124-
125- let fut =
126- Box :: pin ( assignment_event_stream. map_err ( SubgraphAssignmentProviderError :: Unknown ) )
127- . cancelable ( & assignment_event_stream_cancel_handle)
128- . for_each ( {
129- move |event| {
130- let this = self . cheap_clone ( ) ;
131- let provider = self . provider . clone ( ) ;
132- async move {
133- match event {
134- Ok ( event) => {
135- assert_eq ! ( event. node_id( ) , & this. node_id) ;
136- handle_assignment_event ( event, provider. clone ( ) ) . await
137- }
138- Err ( e) => panic_on_cancel ( & this. logger , e) ,
139- } ;
140- }
141- }
142- } ) ;
107+ let fut = assignment_event_stream. for_each ( {
108+ move |event| {
109+ // The assignment stream should run forever. If it gets
110+ // cancelled, that probably indicates a serious problem and
111+ // we panic
112+ if cancel_handle. is_canceled ( ) {
113+ panic ! ( "assignment event stream canceled" ) ;
114+ }
115+
116+ let this = self . cheap_clone ( ) ;
117+ async move {
118+ this. change_assignment ( event) . await ;
119+ }
120+ }
121+ } ) ;
143122
144123 graph:: spawn ( fut) ;
145124 Ok ( ( ) )
146125 }
147126
148- /// Maps an assignment change to an assignment event by checking the
149- /// current state in the database, ignoring changes that do not affect
150- /// this node or do not require anything to change.
151- async fn map_assignment (
152- & self ,
153- change : AssignmentChange ,
154- ) -> Result < Option < AssignmentEvent > , Error > {
127+ /// Start/stop subgraphs as needed, considering the current assignment
128+ /// state in the database, ignoring changes that do not affect this
129+ /// node, do not require anything to change, or for which we can not
130+ /// find the assignment status from the database
131+ async fn change_assignment ( & self , change : AssignmentChange ) {
155132 let ( deployment, operation) = change. into_parts ( ) ;
156133
157134 trace ! ( self . logger, "Received assignment change" ;
@@ -161,75 +138,53 @@ where
161138
162139 match operation {
163140 AssignmentOperation :: Set => {
164- let assigned = self
165- . store
166- . assignment_status ( & deployment)
167- . await
168- . map_err ( |e| anyhow ! ( "Failed to get subgraph assignment entity: {}" , e) ) ?;
141+ let assigned = match self . store . assignment_status ( & deployment) . await {
142+ Ok ( assigned) => assigned,
143+ Err ( e) => {
144+ error ! (
145+ self . logger,
146+ "Failed to get subgraph assignment entity" ; "deployment" => deployment, "error" => e. to_string( )
147+ ) ;
148+ return ;
149+ }
150+ } ;
169151
170152 let logger = self . logger . new ( o ! ( "subgraph_id" => deployment. hash. to_string( ) , "node_id" => self . node_id. to_string( ) ) ) ;
171153 if let Some ( ( assigned, is_paused) ) = assigned {
172154 if & assigned == & self . node_id {
173155 if is_paused {
174156 // Subgraph is paused, so we don't start it
175157 debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore" ) ;
176- return Ok ( None ) ;
158+ return ;
177159 }
178160
179161 // Start subgraph on this node
180162 debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "action" => "add" ) ;
181- Ok ( Some ( AssignmentEvent :: Add {
182- deployment,
183- node_id : self . node_id . clone ( ) ,
184- } ) )
163+ self . provider . start ( deployment, None ) . await ;
185164 } else {
186165 // Ensure it is removed from this node
187166 debug ! ( logger, "Deployment assignee is not this node" ; "assigned_to" => assigned, "action" => "remove" ) ;
188- Ok ( Some ( AssignmentEvent :: Remove {
189- deployment,
190- node_id : self . node_id . clone ( ) ,
191- } ) )
167+ self . provider . stop ( deployment) . await
192168 }
193169 } else {
194170 // Was added/updated, but is now gone.
195171 debug ! ( self . logger, "Deployment assignee not found in database" ; "action" => "ignore" ) ;
196- Ok ( None )
197172 }
198173 }
199174 AssignmentOperation :: Removed => {
200175 // Send remove event without checking node ID.
201176 // If node ID does not match, then this is a no-op when handled in
202177 // assignment provider.
203- Ok ( Some ( AssignmentEvent :: Remove {
204- deployment,
205- node_id : self . node_id . clone ( ) ,
206- } ) )
178+ self . provider . stop ( deployment) . await ;
207179 }
208180 }
209181 }
210182
211- pub async fn assignment_events (
212- self : Arc < Self > ,
213- ) -> impl Stream < Item = Result < AssignmentEvent , Error > > + Send {
183+ pub async fn assignment_events ( self : Arc < Self > ) -> impl Stream < Item = AssignmentChange > + Send {
214184 self . subscription_manager
215185 . subscribe ( )
216186 . map ( |event| futures03:: stream:: iter ( event. changes . clone ( ) ) )
217187 . flatten ( )
218- . then ( {
219- let this = self . cheap_clone ( ) ;
220- move |change| {
221- let this = this. cheap_clone ( ) ;
222-
223- async move {
224- match this. map_assignment ( change) . await {
225- Ok ( Some ( event) ) => stream:: once ( futures03:: future:: ok ( event) ) . boxed ( ) ,
226- Ok ( None ) => stream:: empty ( ) . boxed ( ) ,
227- Err ( e) => stream:: once ( futures03:: future:: err ( e) ) . boxed ( ) ,
228- }
229- }
230- }
231- } )
232- . flatten ( )
233188 }
234189
235190 async fn start_assigned_subgraphs ( & self ) -> Result < ( ) , Error > {
@@ -456,22 +411,6 @@ where
456411 }
457412}
458413
459- async fn handle_assignment_event (
460- event : AssignmentEvent ,
461- provider : Arc < impl SubgraphAssignmentProviderTrait > ,
462- ) {
463- match event {
464- AssignmentEvent :: Add {
465- deployment,
466- node_id : _,
467- } => provider. start ( deployment, None ) . await ,
468- AssignmentEvent :: Remove {
469- deployment,
470- node_id : _,
471- } => provider. stop ( deployment) . await ,
472- }
473- }
474-
475414/// Resolves the subgraph's earliest block
476415async fn resolve_start_block (
477416 manifest : & SubgraphManifest < impl Blockchain > ,
0 commit comments