@@ -119,97 +119,6 @@ impl Chain {
119119 is_ingestible,
120120 }
121121 }
122-
123- async fn new_polling_block_stream (
124- & self ,
125- deployment : DeploymentLocator ,
126- start_blocks : Vec < BlockNumber > ,
127- adapter : Arc < TriggersAdapter > ,
128- filter : Arc < TriggerFilter > ,
129- metrics : Arc < BlockStreamMetrics > ,
130- unified_api_version : UnifiedMappingApiVersion ,
131- ) -> Result < Box < dyn BlockStream < Self > > , Error > {
132- let logger = self
133- . logger_factory
134- . subgraph_logger ( & deployment)
135- . new ( o ! ( "component" => "BlockStream" ) ) ;
136- let chain_store = self . chain_store ( ) . clone ( ) ;
137- let writable = self
138- . subgraph_store
139- . cheap_clone ( )
140- . writable ( logger. clone ( ) , deployment. id )
141- . await
142- . with_context ( || format ! ( "no store for deployment `{}`" , deployment. hash) ) ?;
143- let chain_head_update_stream = self
144- . chain_head_update_listener
145- . subscribe ( self . name . clone ( ) , logger. clone ( ) ) ;
146-
147- // Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
148- // This is ok because Celo blocks are always final. And we _need_ to do this because
149- // some events appear only in eth_getLogs but not in transaction receipts.
150- // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
151- let chain_id = self . eth_adapters . cheapest ( ) . unwrap ( ) . chain_id ( ) . await ?;
152- let reorg_threshold = match CELO_CHAIN_IDS . contains ( & chain_id) {
153- false => self . reorg_threshold ,
154- true => 0 ,
155- } ;
156-
157- let start_block = writable. block_ptr ( ) ?;
158-
159- Ok ( Box :: new ( PollingBlockStream :: new (
160- writable,
161- chain_store,
162- chain_head_update_stream,
163- adapter,
164- self . node_id . clone ( ) ,
165- deployment. hash ,
166- filter,
167- start_blocks,
168- reorg_threshold,
169- logger,
170- metrics,
171- * MAX_BLOCK_RANGE_SIZE ,
172- * TARGET_TRIGGERS_PER_BLOCK_RANGE ,
173- unified_api_version,
174- start_block,
175- ) ) )
176- }
177-
178- async fn new_firehose_block_stream (
179- & self ,
180- deployment : DeploymentLocator ,
181- start_blocks : Vec < BlockNumber > ,
182- adapter : Arc < TriggersAdapter > ,
183- filter : Arc < TriggerFilter > ,
184- ) -> Result < Box < dyn BlockStream < Self > > , Error > {
185- let firehose_endpoint = match self . firehose_endpoints . random ( ) {
186- Some ( e) => e. clone ( ) ,
187- None => return Err ( anyhow:: format_err!( "no firehose endpoint available" , ) ) ,
188- } ;
189-
190- let logger = self
191- . logger_factory
192- . subgraph_logger ( & deployment)
193- . new ( o ! ( "component" => "FirehoseBlockStream" ) ) ;
194-
195- let firehose_mapper = Arc :: new ( FirehoseMapper { } ) ;
196- let firehose_cursor = self
197- . subgraph_store
198- . cheap_clone ( )
199- . writable ( logger. clone ( ) , deployment. id )
200- . await ?
201- . block_cursor ( ) ?;
202-
203- Ok ( Box :: new ( FirehoseBlockStream :: new (
204- firehose_endpoint,
205- firehose_cursor,
206- firehose_mapper,
207- adapter,
208- filter,
209- start_blocks,
210- logger,
211- ) ) )
212- }
213122}
214123
215124#[ async_trait]
@@ -279,11 +188,12 @@ impl Blockchain for Chain {
279188 Ok ( Arc :: new ( adapter) )
280189 }
281190
282- async fn new_block_stream (
191+ async fn new_firehose_block_stream (
283192 & self ,
284193 deployment : DeploymentLocator ,
285194 start_blocks : Vec < BlockNumber > ,
286- filter : Arc < TriggerFilter > ,
195+ firehose_cursor : Option < String > ,
196+ filter : Arc < Self :: TriggerFilter > ,
287197 metrics : Arc < BlockStreamMetrics > ,
288198 unified_api_version : UnifiedMappingApiVersion ,
289199 ) -> Result < Box < dyn BlockStream < Self > > , Error > {
@@ -300,20 +210,93 @@ impl Blockchain for Chain {
300210 self . name, requirements
301211 ) ) ;
302212
303- if self . firehose_endpoints . len ( ) > 0 {
304- self . new_firehose_block_stream ( deployment, start_blocks, adapter, filter)
305- . await
306- } else {
307- self . new_polling_block_stream (
308- deployment,
309- start_blocks,
310- adapter,
311- filter,
312- metrics,
313- unified_api_version,
213+ let firehose_endpoint = match self . firehose_endpoints . random ( ) {
214+ Some ( e) => e. clone ( ) ,
215+ None => return Err ( anyhow:: format_err!( "no firehose endpoint available" , ) ) ,
216+ } ;
217+
218+ let logger = self
219+ . logger_factory
220+ . subgraph_logger ( & deployment)
221+ . new ( o ! ( "component" => "FirehoseBlockStream" ) ) ;
222+
223+ let firehose_mapper = Arc :: new ( FirehoseMapper { } ) ;
224+
225+ Ok ( Box :: new ( FirehoseBlockStream :: new (
226+ firehose_endpoint,
227+ firehose_cursor,
228+ firehose_mapper,
229+ adapter,
230+ filter,
231+ start_blocks,
232+ logger,
233+ ) ) )
234+ }
235+
236+ async fn new_polling_block_stream (
237+ & self ,
238+ deployment : DeploymentLocator ,
239+ start_blocks : Vec < BlockNumber > ,
240+ subgraph_start_block : Option < BlockPtr > ,
241+ filter : Arc < Self :: TriggerFilter > ,
242+ metrics : Arc < BlockStreamMetrics > ,
243+ unified_api_version : UnifiedMappingApiVersion ,
244+ ) -> Result < Box < dyn BlockStream < Self > > , Error > {
245+ let requirements = filter. node_capabilities ( ) ;
246+ let adapter = self
247+ . triggers_adapter (
248+ & deployment,
249+ & requirements,
250+ unified_api_version. clone ( ) ,
251+ metrics. stopwatch . clone ( ) ,
314252 )
253+ . expect ( & format ! (
254+ "no adapter for network {} with capabilities {}" ,
255+ self . name, requirements
256+ ) ) ;
257+
258+ let logger = self
259+ . logger_factory
260+ . subgraph_logger ( & deployment)
261+ . new ( o ! ( "component" => "BlockStream" ) ) ;
262+ let chain_store = self . chain_store ( ) . clone ( ) ;
263+ let writable = self
264+ . subgraph_store
265+ . cheap_clone ( )
266+ . writable ( logger. clone ( ) , deployment. id )
315267 . await
316- }
268+ . with_context ( || format ! ( "no store for deployment `{}`" , deployment. hash) ) ?;
269+ let chain_head_update_stream = self
270+ . chain_head_update_listener
271+ . subscribe ( self . name . clone ( ) , logger. clone ( ) ) ;
272+
273+ // Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
274+ // This is ok because Celo blocks are always final. And we _need_ to do this because
275+ // some events appear only in eth_getLogs but not in transaction receipts.
276+ // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
277+ let chain_id = self . eth_adapters . cheapest ( ) . unwrap ( ) . chain_id ( ) . await ?;
278+ let reorg_threshold = match CELO_CHAIN_IDS . contains ( & chain_id) {
279+ false => self . reorg_threshold ,
280+ true => 0 ,
281+ } ;
282+
283+ Ok ( Box :: new ( PollingBlockStream :: new (
284+ writable,
285+ chain_store,
286+ chain_head_update_stream,
287+ adapter,
288+ self . node_id . clone ( ) ,
289+ deployment. hash ,
290+ filter,
291+ start_blocks,
292+ reorg_threshold,
293+ logger,
294+ metrics,
295+ * MAX_BLOCK_RANGE_SIZE ,
296+ * TARGET_TRIGGERS_PER_BLOCK_RANGE ,
297+ unified_api_version,
298+ subgraph_start_block,
299+ ) ) )
317300 }
318301
319302 fn ingestor_adapter ( & self ) -> Arc < Self :: IngestorAdapter > {
@@ -365,6 +348,10 @@ impl Blockchain for Chain {
365348 call_cache : self . call_cache . cheap_clone ( ) ,
366349 } )
367350 }
351+
352+ fn is_firehose_supported ( & self ) -> bool {
353+ self . firehose_endpoints . len ( ) > 0
354+ }
368355}
369356
370357/// This is used in `EthereumAdapter::triggers_in_block`, called when re-processing a block for
0 commit comments