@@ -11,21 +11,20 @@ use graph::blockchain::block_stream::{
1111 BlockStream , BlockStreamError , BlockStreamEvent , BlockWithTriggers , ChainHeadUpdateStream ,
1212 FirehoseCursor , TriggersAdapterWrapper , BUFFERED_BLOCK_STREAM_SIZE ,
1313} ;
14- use graph:: blockchain:: { Block , BlockPtr , Blockchain , TriggerFilterWrapper } ;
14+ use graph:: blockchain:: { Block , BlockPtr , TriggerFilterWrapper } ;
1515use graph:: futures03:: { stream:: Stream , Future , FutureExt } ;
1616use graph:: prelude:: { ChainStore , CheapClone , DeploymentHash , NodeId , BLOCK_NUMBER_MAX } ;
1717use graph:: slog:: { debug, info, trace, warn, Logger } ;
1818
1919use graph:: components:: store:: BlockNumber ;
2020use graph:: data:: subgraph:: UnifiedMappingApiVersion ;
2121
22+ use crate :: Chain ;
23+
2224// A high number here forces a slow start.
2325const STARTING_PREVIOUS_TRIGGERS_PER_BLOCK : f64 = 1_000_000.0 ;
2426
25- enum BlockStreamState < C >
26- where
27- C : Blockchain ,
28- {
27+ enum BlockStreamState {
2928 /// Starting or restarting reconciliation.
3029 ///
3130 /// Valid next states: Reconciliation
@@ -34,13 +33,13 @@ where
3433 /// The BlockStream is reconciling the subgraph store state with the chain store state.
3534 ///
3635 /// Valid next states: YieldingBlocks, Idle, BeginReconciliation (in case of revert)
37- Reconciliation ( Pin < Box < dyn Future < Output = Result < NextBlocks < C > , Error > > + Send > > ) ,
36+ Reconciliation ( Pin < Box < dyn Future < Output = Result < NextBlocks , Error > > + Send > > ) ,
3837
3938 /// The BlockStream is emitting blocks that must be processed in order to bring the subgraph
4039 /// store up to date with the chain store.
4140 ///
4241 /// Valid next states: BeginReconciliation
43- YieldingBlocks ( Box < VecDeque < BlockWithTriggers < C > > > ) ,
42+ YieldingBlocks ( Box < VecDeque < BlockWithTriggers < Chain > > > ) ,
4443
4544 /// The BlockStream experienced an error and is pausing before attempting to produce
4645 /// blocks again.
@@ -57,16 +56,13 @@ where
5756
5857/// A single next step to take in reconciling the state of the subgraph store with the state of the
5958/// chain store.
60- enum ReconciliationStep < C >
61- where
62- C : Blockchain ,
63- {
59+ enum ReconciliationStep {
6460 /// Revert(to) the block the subgraph should be reverted to, so it becomes the new subgraph
6561 /// head.
6662 Revert ( BlockPtr ) ,
6763
6864 /// Move forwards, processing one or more blocks. Second element is the block range size.
69- ProcessDescendantBlocks ( Vec < BlockWithTriggers < C > > , BlockNumber ) ,
65+ ProcessDescendantBlocks ( Vec < BlockWithTriggers < Chain > > , BlockNumber ) ,
7066
7167 /// This step is a no-op, but we need to check again for a next step.
7268 Retry ,
@@ -76,18 +72,15 @@ where
7672 Done ,
7773}
7874
79- struct PollingBlockStreamContext < C >
80- where
81- C : Blockchain ,
82- {
75+ struct PollingBlockStreamContext {
8376 chain_store : Arc < dyn ChainStore > ,
84- adapter : Arc < TriggersAdapterWrapper < C > > ,
77+ adapter : Arc < TriggersAdapterWrapper < Chain > > ,
8578 node_id : NodeId ,
8679 subgraph_id : DeploymentHash ,
8780 // This is not really a block number, but the (unsigned) difference
8881 // between two block numbers
8982 reorg_threshold : BlockNumber ,
90- filter : Arc < TriggerFilterWrapper < C > > ,
83+ filter : Arc < TriggerFilterWrapper < Chain > > ,
9184 start_blocks : Vec < BlockNumber > ,
9285 logger : Logger ,
9386 previous_triggers_per_block : f64 ,
10093 current_block : Option < BlockPtr > ,
10194}
10295
103- impl < C : Blockchain > Clone for PollingBlockStreamContext < C > {
96+ impl Clone for PollingBlockStreamContext {
10497 fn clone ( & self ) -> Self {
10598 Self {
10699 chain_store : self . chain_store . cheap_clone ( ) ,
@@ -121,37 +114,31 @@ impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
121114 }
122115}
123116
124- pub struct PollingBlockStream < C : Blockchain > {
125- state : BlockStreamState < C > ,
117+ pub struct PollingBlockStream {
118+ state : BlockStreamState ,
126119 consecutive_err_count : u32 ,
127120 chain_head_update_stream : ChainHeadUpdateStream ,
128- ctx : PollingBlockStreamContext < C > ,
121+ ctx : PollingBlockStreamContext ,
129122}
130123
131124// This is the same as `ReconciliationStep` but without retries.
132- enum NextBlocks < C >
133- where
134- C : Blockchain ,
135- {
125+ enum NextBlocks {
136126 /// Blocks and range size
137- Blocks ( VecDeque < BlockWithTriggers < C > > , BlockNumber ) ,
127+ Blocks ( VecDeque < BlockWithTriggers < Chain > > , BlockNumber ) ,
138128
139129 // The payload is block the subgraph should be reverted to, so it becomes the new subgraph head.
140130 Revert ( BlockPtr ) ,
141131 Done ,
142132}
143133
144- impl < C > PollingBlockStream < C >
145- where
146- C : Blockchain ,
147- {
134+ impl PollingBlockStream {
148135 pub fn new (
149136 chain_store : Arc < dyn ChainStore > ,
150137 chain_head_update_stream : ChainHeadUpdateStream ,
151- adapter : Arc < TriggersAdapterWrapper < C > > ,
138+ adapter : Arc < TriggersAdapterWrapper < Chain > > ,
152139 node_id : NodeId ,
153140 subgraph_id : DeploymentHash ,
154- filter : Arc < TriggerFilterWrapper < C > > ,
141+ filter : Arc < TriggerFilterWrapper < Chain > > ,
155142 start_blocks : Vec < BlockNumber > ,
156143 reorg_threshold : BlockNumber ,
157144 logger : Logger ,
@@ -184,12 +171,9 @@ where
184171 }
185172}
186173
187- impl < C > PollingBlockStreamContext < C >
188- where
189- C : Blockchain ,
190- {
174+ impl PollingBlockStreamContext {
191175 /// Perform reconciliation steps until there are blocks to yield or we are up-to-date.
192- async fn next_blocks ( & self ) -> Result < NextBlocks < C > , Error > {
176+ async fn next_blocks ( & self ) -> Result < NextBlocks , Error > {
193177 let ctx = self . clone ( ) ;
194178
195179 loop {
@@ -214,7 +198,7 @@ where
214198 }
215199
216200 /// Determine the next reconciliation step. Does not modify Store or ChainStore.
217- async fn get_next_step ( & self ) -> Result < ReconciliationStep < C > , Error > {
201+ async fn get_next_step ( & self ) -> Result < ReconciliationStep , Error > {
218202 let ctx = self . clone ( ) ;
219203 let start_blocks = self . start_blocks . clone ( ) ;
220204 let max_block_range_size = self . max_block_range_size ;
@@ -500,14 +484,14 @@ where
500484 }
501485}
502486
503- impl < C : Blockchain > BlockStream < C > for PollingBlockStream < C > {
487+ impl BlockStream < Chain > for PollingBlockStream {
504488 fn buffer_size_hint ( & self ) -> usize {
505489 BUFFERED_BLOCK_STREAM_SIZE
506490 }
507491}
508492
509- impl < C : Blockchain > Stream for PollingBlockStream < C > {
510- type Item = Result < BlockStreamEvent < C > , BlockStreamError > ;
493+ impl Stream for PollingBlockStream {
494+ type Item = Result < BlockStreamEvent < Chain > , BlockStreamError > ;
511495
512496 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
513497 let result = loop {
0 commit comments