@@ -30,6 +30,7 @@ use std::time::{Duration, Instant};
3030use tokio:: task;
3131
3232const MINUTE : Duration = Duration :: from_secs ( 60 ) ;
33+
3334const BUFFERED_BLOCK_STREAM_SIZE : usize = 100 ;
3435const BUFFERED_FIREHOSE_STREAM_SIZE : usize = 1 ;
3536
@@ -481,7 +482,6 @@ async fn new_block_stream<C: Blockchain>(
481482
482483 chain. new_polling_block_stream (
483484 inputs. deployment . clone ( ) ,
484- inputs. store . clone ( ) ,
485485 inputs. start_blocks . clone ( ) ,
486486 start_block,
487487 Arc :: new ( filter. clone ( ) ) ,
@@ -514,6 +514,7 @@ where
514514 let id_for_err = inputs. deployment . hash . clone ( ) ;
515515 let mut should_try_unfail_deterministic = true ;
516516 let mut should_try_unfail_non_deterministic = true ;
517+ let mut synced = false ;
517518
518519 // Exponential backoff that starts with two minutes and keeps
519520 // increasing its timeout exponentially until it reaches the ceiling.
@@ -532,6 +533,8 @@ where
532533 . await ?
533534 . map_err ( CancelableError :: Error )
534535 . cancelable ( & block_stream_canceler, || Err ( CancelableError :: Cancel ) ) ;
536+ let chain = inputs. chain . clone ( ) ;
537+ let chain_store = chain. chain_store ( ) ;
535538
536539 // Keep the stream's cancel guard around to be able to shut it down
537540 // when the subgraph deployment is unassigned
@@ -706,6 +709,20 @@ where
706709
707710 match res {
708711 Ok ( needs_restart) => {
712+ // Once synced, no need to try to update the status again.
713+ if !synced && is_deployment_synced ( & block_ptr, chain_store. cached_head_ptr ( ) ?) {
714+ // Updating the sync status is an one way operation.
715+ // This state change exists: not synced -> synced
716+ // This state change does NOT: synced -> not synced
717+ inputs. store . deployment_synced ( ) ?;
718+
719+ // Stop trying to update the sync status.
720+ synced = true ;
721+
722+ // Stop recording time-to-sync metrics.
723+ ctx. block_stream_metrics . stopwatch . disable ( ) ;
724+ }
725+
709726 // Keep trying to unfail subgraph for everytime it advances block(s) until it's
710727 // health is not Failed anymore.
711728 if should_try_unfail_non_deterministic {
@@ -1293,3 +1310,35 @@ fn persist_dynamic_data_sources<T: RuntimeHostBuilder<C>, C: Blockchain>(
12931310 // Merge filters from data sources into the block stream builder
12941311 ctx. state . filter . extend ( data_sources. iter ( ) ) ;
12951312}
1313+
1314+ /// Checks if the Deployment BlockPtr is at least one block behind to the chain head.
1315+ fn is_deployment_synced ( deployment_head_ptr : & BlockPtr , chain_head_ptr : Option < BlockPtr > ) -> bool {
1316+ matches ! ( ( deployment_head_ptr, & chain_head_ptr) , ( b1, Some ( b2) ) if b1. number >= ( b2. number - 1 ) )
1317+ }
1318+
1319+ #[ test]
1320+ fn test_is_deployment_synced ( ) {
1321+ let block_0 = BlockPtr :: try_from ( (
1322+ "bd34884280958002c51d3f7b5f853e6febeba33de0f40d15b0363006533c924f" ,
1323+ 0 ,
1324+ ) )
1325+ . unwrap ( ) ;
1326+ let block_1 = BlockPtr :: try_from ( (
1327+ "8511fa04b64657581e3f00e14543c1d522d5d7e771b54aa3060b662ade47da13" ,
1328+ 1 ,
1329+ ) )
1330+ . unwrap ( ) ;
1331+ let block_2 = BlockPtr :: try_from ( (
1332+ "b98fb783b49de5652097a989414c767824dff7e7fd765a63b493772511db81c1" ,
1333+ 2 ,
1334+ ) )
1335+ . unwrap ( ) ;
1336+
1337+ assert ! ( !is_deployment_synced( & block_0, None ) ) ;
1338+ assert ! ( !is_deployment_synced( & block_2, None ) ) ;
1339+
1340+ assert ! ( !is_deployment_synced( & block_0, Some ( block_2. clone( ) ) ) ) ;
1341+
1342+ assert ! ( is_deployment_synced( & block_1, Some ( block_2. clone( ) ) ) ) ;
1343+ assert ! ( is_deployment_synced( & block_2, Some ( block_2. clone( ) ) ) ) ;
1344+ }
0 commit comments