11use std:: collections:: HashSet ;
2- use std:: time:: Instant ;
32
43use async_trait:: async_trait;
54use graph:: blockchain:: Blockchain ;
@@ -12,7 +11,6 @@ use graph::data::subgraph::schema::DeploymentCreate;
1211use graph:: data:: subgraph:: Graft ;
1312use graph:: data:: value:: Word ;
1413use graph:: futures03;
15- use graph:: futures03:: future:: FutureExt ;
1614use graph:: futures03:: future:: TryFutureExt ;
1715use graph:: futures03:: stream;
1816use graph:: futures03:: stream:: TryStreamExt ;
@@ -238,7 +236,6 @@ where
238236 }
239237
240238 async fn start_assigned_subgraphs ( & self ) -> Result < ( ) , Error > {
241- let provider = self . provider . clone ( ) ;
242239 let logger = self . logger . clone ( ) ;
243240 let node_id = self . node_id . clone ( ) ;
244241
@@ -258,9 +255,12 @@ where
258255 let ( sender, receiver) = futures03:: channel:: mpsc:: channel :: < ( ) > ( 1 ) ;
259256 for id in deployments {
260257 let sender = sender. clone ( ) ;
261- let logger = logger . clone ( ) ;
258+ let provider = self . provider . cheap_clone ( ) ;
262259
263- graph:: spawn ( start_subgraph ( id, provider. clone ( ) , logger) . map ( move |( ) | drop ( sender) ) ) ;
260+ graph:: spawn ( async move {
261+ provider. start ( id, None ) . await ;
262+ drop ( sender)
263+ } ) ;
264264 }
265265 drop ( sender) ;
266266 let _: Vec < _ > = receiver. collect ( ) . await ;
@@ -473,7 +473,7 @@ async fn handle_assignment_event(
473473 deployment,
474474 node_id : _,
475475 } => {
476- start_subgraph ( deployment , provider. clone ( ) , logger ) . await ;
476+ provider. start ( deployment , None ) . await ;
477477 Ok ( ( ) )
478478 }
479479 AssignmentEvent :: Remove {
@@ -486,39 +486,6 @@ async fn handle_assignment_event(
486486 }
487487}
488488
489- async fn start_subgraph (
490- deployment : DeploymentLocator ,
491- provider : Arc < impl SubgraphAssignmentProviderTrait > ,
492- logger : Logger ,
493- ) {
494- let logger = logger
495- . new ( o ! ( "subgraph_id" => deployment. hash. to_string( ) , "sgd" => deployment. id. to_string( ) ) ) ;
496-
497- trace ! ( logger, "Start subgraph" ) ;
498-
499- let start_time = Instant :: now ( ) ;
500- let result = provider. start ( deployment. clone ( ) , None ) . await ;
501-
502- debug ! (
503- logger,
504- "Subgraph started" ;
505- "start_ms" => start_time. elapsed( ) . as_millis( )
506- ) ;
507-
508- match result {
509- Ok ( ( ) ) => ( ) ,
510- Err ( SubgraphAssignmentProviderError :: AlreadyRunning ( _) ) => ( ) ,
511- Err ( e) => {
512- // Errors here are likely an issue with the subgraph.
513- error ! (
514- logger,
515- "Subgraph instance failed to start" ;
516- "error" => e. to_string( )
517- ) ;
518- }
519- }
520- }
521-
522489/// Resolves the subgraph's earliest block
523490async fn resolve_start_block (
524491 manifest : & SubgraphManifest < impl Blockchain > ,
0 commit comments