1- use crate :: { BlobFetcherError , Blobs , FetchResult } ;
1+ use crate :: { BlobFetcher , BlobberError , BlobberResult , Blobs , FetchResult } ;
22use alloy:: consensus:: { SidecarCoder , SimpleCoder , Transaction as _} ;
33use alloy:: eips:: eip7691:: MAX_BLOBS_PER_BLOCK_ELECTRA ;
44use alloy:: eips:: merge:: EPOCH_SLOTS ;
55use alloy:: primitives:: { B256 , Bytes , keccak256} ;
6+ use core:: fmt;
67use reth:: transaction_pool:: TransactionPool ;
78use reth:: { network:: cache:: LruMap , primitives:: Receipt } ;
89use signet_extract:: ExtractedEvent ;
910use signet_zenith:: Zenith :: BlockSubmitted ;
1011use signet_zenith:: ZenithBlock ;
12+ use std:: marker:: PhantomData ;
1113use std:: {
1214 sync:: { Arc , Mutex } ,
1315 time:: Duration ,
1416} ;
1517use tokio:: sync:: { mpsc, oneshot} ;
16- use tracing:: { Instrument , debug , error, info, instrument} ;
18+ use tracing:: { Instrument , debug_span , error, info, instrument, trace } ;
1719
1820const BLOB_CACHE_SIZE : u32 = ( MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS ) as u32 ;
1921const CACHE_REQUEST_CHANNEL_SIZE : usize = ( MAX_BLOBS_PER_BLOCK_ELECTRA * 2 ) as usize ;
@@ -37,11 +39,13 @@ enum CacheInst {
3739
3840/// Handle for the cache.
3941#[ derive( Debug , Clone ) ]
40- pub struct CacheHandle {
42+ pub struct CacheHandle < Coder = SimpleCoder > {
4143 sender : mpsc:: Sender < CacheInst > ,
44+
45+ _coder : PhantomData < Coder > ,
4246}
4347
44- impl CacheHandle {
48+ impl < Coder > CacheHandle < Coder > {
4549 /// Sends a cache instruction.
4650 async fn send ( & self , inst : CacheInst ) {
4751 let _ = self . sender . send ( inst) . await ;
@@ -54,7 +58,7 @@ impl CacheHandle {
5458 slot : usize ,
5559 tx_hash : B256 ,
5660 version_hashes : Vec < B256 > ,
57- ) -> FetchResult < Blobs > {
61+ ) -> BlobberResult < Blobs > {
5862 let ( resp, receiver) = oneshot:: channel ( ) ;
5963
6064 self . send ( CacheInst :: Retrieve {
@@ -66,89 +70,87 @@ impl CacheHandle {
6670 } )
6771 . await ;
6872
69- receiver. await . map_err ( |_| BlobFetcherError :: missing_sidecar ( tx_hash) )
73+ receiver. await . map_err ( |_| BlobberError :: missing_sidecar ( tx_hash) )
7074 }
7175
7276 /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
7377 /// Zenith block data using the provided coder.
74- pub async fn fetch_and_decode_with_coder < C : SidecarCoder > (
78+ pub async fn fetch_and_decode (
7579 & self ,
7680 slot : usize ,
7781 extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
78- mut coder : C ,
79- ) -> FetchResult < Bytes > {
82+ ) -> BlobberResult < Bytes >
83+ where
84+ Coder : SidecarCoder + Default ,
85+ {
8086 let tx_hash = extract. tx_hash ( ) ;
8187 let versioned_hashes = extract
8288 . tx
8389 . as_eip4844 ( )
84- . ok_or_else ( BlobFetcherError :: non_4844_transaction) ?
90+ . ok_or_else ( BlobberError :: non_4844_transaction) ?
8591 . blob_versioned_hashes ( )
8692 . expect ( "tx is eip4844" ) ;
8793
8894 let blobs = self . fetch_blobs ( slot, tx_hash, versioned_hashes. to_owned ( ) ) . await ?;
8995
90- coder
96+ Coder :: default ( )
9197 . decode_all ( blobs. as_ref ( ) )
92- . ok_or_else ( BlobFetcherError :: blob_decode_error) ?
98+ . ok_or_else ( BlobberError :: blob_decode_error) ?
9399 . into_iter ( )
94100 . find ( |data| keccak256 ( data) == extract. block_data_hash ( ) )
95101 . map ( Into :: into)
96- . ok_or_else ( || BlobFetcherError :: block_data_not_found ( tx_hash) )
97- }
98-
99- /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
100- /// [`SimpleCoder`] to get the Zenith block data.
101- pub async fn fech_and_decode (
102- & self ,
103- slot : usize ,
104- extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
105- ) -> FetchResult < Bytes > {
106- self . fetch_and_decode_with_coder ( slot, extract, SimpleCoder :: default ( ) ) . await
102+ . ok_or_else ( || BlobberError :: block_data_not_found ( extract. block_data_hash ( ) ) )
107103 }
108104
109105 /// Fetch the blobs, decode them using the provided coder, and construct a
110106 /// Zenith block from the header and data.
111- pub async fn signet_block_with_coder < C : SidecarCoder > (
112- & self ,
113- host_block_number : u64 ,
114- slot : usize ,
115- extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
116- coder : C ,
117- ) -> FetchResult < ZenithBlock > {
118- let header = extract. ru_header ( host_block_number) ;
119- self . fetch_and_decode_with_coder ( slot, extract, coder)
120- . await
121- . map ( |buf| ZenithBlock :: from_header_and_data ( header, buf) )
122- }
123-
124- /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
125- /// Zenith block from the header and data.
107+ ///
108+ /// # Returns
109+ ///
110+ /// - `Ok(ZenithBlock)` if the block was successfully fetched and
111+ /// decoded.
112+ /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
113+ /// decoded (e.g., due to a malformatted blob).
114+ /// - `Err(FetchError)` if there was an unrecoverable error fetching the
115+ /// blobs.
126116 pub async fn signet_block (
127117 & self ,
128118 host_block_number : u64 ,
129119 slot : usize ,
130120 extract : & ExtractedEvent < ' _ , Receipt , BlockSubmitted > ,
131- ) -> FetchResult < ZenithBlock > {
132- self . signet_block_with_coder ( host_block_number, slot, extract, SimpleCoder :: default ( ) ) . await
121+ ) -> FetchResult < ZenithBlock >
122+ where
123+ Coder : SidecarCoder + Default ,
124+ {
125+ let header = extract. ru_header ( host_block_number) ;
126+ let block_data = match self . fetch_and_decode ( slot, extract) . await {
127+ Ok ( buf) => buf,
128+ Err ( BlobberError :: Decode ( _) ) => {
129+ trace ! ( "Failed to decode block data" ) ;
130+ Bytes :: default ( )
131+ }
132+ Err ( BlobberError :: Fetch ( err) ) => return Err ( err) ,
133+ } ;
134+ Ok ( ZenithBlock :: from_header_and_data ( header, block_data) )
133135 }
134136}
135137
136138/// Retrieves blobs and stores them in a cache for later use.
137139pub struct BlobCacher < Pool > {
138- fetcher : crate :: BlobFetcher < Pool > ,
140+ fetcher : BlobFetcher < Pool > ,
139141
140142 cache : Mutex < LruMap < ( usize , B256 ) , Blobs > > ,
141143}
142144
143- impl < Pool : core :: fmt:: Debug > core :: fmt:: Debug for BlobCacher < Pool > {
144- fn fmt ( & self , f : & mut core :: fmt:: Formatter < ' _ > ) -> core :: fmt:: Result {
145+ impl < Pool : fmt:: Debug > fmt:: Debug for BlobCacher < Pool > {
146+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
145147 f. debug_struct ( "BlobCacher" ) . field ( "fetcher" , & self . fetcher ) . finish_non_exhaustive ( )
146148 }
147149}
148150
149151impl < Pool : TransactionPool + ' static > BlobCacher < Pool > {
150152 /// Creates a new `BlobCacher` with the provided extractor and cache size.
151- pub fn new ( fetcher : crate :: BlobFetcher < Pool > ) -> Self {
153+ pub fn new ( fetcher : BlobFetcher < Pool > ) -> Self {
152154 Self { fetcher, cache : LruMap :: new ( BLOB_CACHE_SIZE ) . into ( ) }
153155 }
154156
@@ -159,7 +161,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
159161 slot : usize ,
160162 tx_hash : B256 ,
161163 versioned_hashes : Vec < B256 > ,
162- ) -> FetchResult < Blobs > {
164+ ) -> BlobberResult < Blobs > {
163165 // Cache hit
164166 if let Some ( blobs) = self . cache . lock ( ) . unwrap ( ) . get ( & ( slot, tx_hash) ) {
165167 info ! ( target: "signet_blobber::BlobCacher" , "Cache hit" ) ;
@@ -169,23 +171,21 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
169171 // Cache miss, use the fetcher to retrieve blobs
170172 // Retry fetching blobs up to `FETCH_RETRIES` times
171173 for attempt in 1 ..=FETCH_RETRIES {
172- let blobs = self . fetcher . fetch_blobs ( slot, tx_hash, & versioned_hashes) . await ;
173-
174- match blobs {
175- Ok ( blobs) => {
176- self . cache . lock ( ) . unwrap ( ) . insert ( ( slot, tx_hash) , blobs. clone ( ) ) ;
177- return Ok ( blobs) ;
178- }
179- Err ( BlobFetcherError :: Ignorable ( e) ) => {
180- debug ! ( target: "signet_blobber::BlobCacher" , attempt, %e, "Blob fetch attempt failed." ) ;
181- tokio:: time:: sleep ( BETWEEN_RETRIES ) . await ;
182- continue ;
183- }
184- Err ( e) => return Err ( e) , // unrecoverable error
185- }
174+ let Ok ( blobs) = self
175+ . fetcher
176+ . fetch_blobs ( slot, tx_hash, & versioned_hashes)
177+ . instrument ( debug_span ! ( "fetch_blobs_loop" , attempt) )
178+ . await
179+ else {
180+ tokio:: time:: sleep ( BETWEEN_RETRIES ) . await ;
181+ continue ;
182+ } ;
183+
184+ self . cache . lock ( ) . unwrap ( ) . insert ( ( slot, tx_hash) , blobs. clone ( ) ) ;
185+ return Ok ( blobs) ;
186186 }
187187 error ! ( target: "signet_blobber::BlobCacher" , "All fetch attempts failed" ) ;
188- Err ( BlobFetcherError :: missing_sidecar ( tx_hash) )
188+ Err ( BlobberError :: missing_sidecar ( tx_hash) )
189189 }
190190
191191 /// Processes the cache instructions.
@@ -215,10 +215,10 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
215215 ///
216216 /// # Panics
217217 /// This function will panic if the cache task fails to spawn.
218- pub fn spawn ( self ) -> CacheHandle {
218+ pub fn spawn < C : SidecarCoder + Default > ( self ) -> CacheHandle < C > {
219219 let ( sender, inst) = mpsc:: channel ( CACHE_REQUEST_CHANNEL_SIZE ) ;
220220 tokio:: spawn ( Arc :: new ( self ) . task_future ( inst) ) ;
221- CacheHandle { sender }
221+ CacheHandle { sender, _coder : PhantomData }
222222 }
223223}
224224
@@ -234,7 +234,6 @@ mod tests {
234234 rlp:: encode,
235235 signers:: { SignerSync , local:: PrivateKeySigner } ,
236236 } ;
237- use init4_bin_base:: utils:: calc:: SlotCalculator ;
238237 use reth:: primitives:: Transaction ;
239238 use reth_transaction_pool:: {
240239 PoolTransaction , TransactionOrigin ,
@@ -250,7 +249,6 @@ mod tests {
250249 let test = signet_constants:: KnownChains :: Test ;
251250
252251 let constants: SignetSystemConstants = test. try_into ( ) . unwrap ( ) ;
253- let calc = SlotCalculator :: new ( 0 , 0 , 12 ) ;
254252
255253 let explorer_url = "https://api.holesky.blobscan.com/" ;
256254 let client = reqwest:: Client :: builder ( ) . use_rustls_tls ( ) ;
@@ -286,9 +284,8 @@ mod tests {
286284 . with_explorer_url ( explorer_url)
287285 . with_client_builder ( client)
288286 . unwrap ( )
289- . with_slot_calculator ( calc)
290287 . build_cache ( ) ?;
291- let handle = cache. spawn ( ) ;
288+ let handle = cache. spawn :: < SimpleCoder > ( ) ;
292289
293290 let got = handle
294291 . fetch_blobs (
0 commit comments