@@ -101,19 +101,20 @@ impl CardanoTransactionsImporter {
101101 ) ;
102102 return Ok ( ( ) ) ;
103103 }
104-
105- // todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
106- // at once in memory (probably using iterators)
107- let scanned_blocks = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
108- let parsed_transactions: Vec < CardanoTransaction > =
109- scanned_blocks. flat_map ( |b| b. into_transactions ( ) ) . collect ( ) ;
110104 debug ! (
111105 self . logger,
112- "TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'" ,
113- parsed_transactions. len( ) ,
106+ "TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'" ,
114107 from. unwrap_or( 0 )
115108 ) ;
116109
110+ let mut streamer = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
111+ let parsed_transactions: Vec < CardanoTransaction > = streamer
112+ . poll_all ( )
113+ . await ?
114+ . into_iter ( )
115+ . flat_map ( |b| b. into_transactions ( ) )
116+ . collect ( ) ;
117+
117118 self . transaction_store
118119 . store_transactions ( parsed_transactions)
119120 . await ?;
@@ -178,7 +179,9 @@ impl TransactionsImporter for CardanoTransactionsImporter {
178179mod tests {
179180 use mockall:: mock;
180181
181- use mithril_common:: cardano_block_scanner:: { DumbBlockScanner , ScannedBlock } ;
182+ use mithril_common:: cardano_block_scanner:: {
183+ BlockStreamer , DumbBlockScanner , DumbBlockStreamer , ScannedBlock ,
184+ } ;
182185 use mithril_common:: crypto_helper:: MKTree ;
183186 use mithril_common:: entities:: { BlockNumber , BlockRangesSequence } ;
184187
@@ -197,7 +200,7 @@ mod tests {
197200 dirpath: & Path ,
198201 from_immutable: Option <ImmutableFileNumber >,
199202 until_immutable: ImmutableFileNumber ,
200- ) -> StdResult <Box <dyn Iterator < Item = ScannedBlock > >>;
203+ ) -> StdResult <Box <dyn BlockStreamer >>;
201204 }
202205 }
203206
@@ -265,7 +268,7 @@ mod tests {
265268 scanner_mock
266269 . expect_scan ( )
267270 . withf ( move |_, from, until| from. is_none ( ) && until == & up_to_beacon)
268- . return_once ( move |_, _, _| Ok ( Box :: new ( blocks . into_iter ( ) ) ) ) ;
271+ . return_once ( move |_, _, _| Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ blocks ] ) ) ) ) ;
269272 CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
270273 } ;
271274
@@ -427,7 +430,9 @@ mod tests {
427430 scanner_mock
428431 . expect_scan ( )
429432 . withf ( move |_, from, until| from == & Some ( 12 ) && until == & up_to_beacon)
430- . return_once ( move |_, _, _| Ok ( Box :: new ( scanned_blocks. into_iter ( ) ) ) )
433+ . return_once ( move |_, _, _| {
434+ Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ scanned_blocks] ) ) )
435+ } )
431436 . once ( ) ;
432437 CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
433438 } ;
@@ -605,13 +610,9 @@ mod tests {
605610 let transactions = into_transactions ( & blocks) ;
606611 let importer = {
607612 let connection = cardano_tx_db_connection ( ) . unwrap ( ) ;
608- let mut scanner = MockBlockScannerImpl :: new ( ) ;
609- scanner
610- . expect_scan ( )
611- . return_once ( move |_, _, _| Ok ( Box :: new ( blocks. into_iter ( ) ) ) ) ;
612613
613614 CardanoTransactionsImporter :: new_for_test (
614- Arc :: new ( scanner ) ,
615+ Arc :: new ( DumbBlockScanner :: new ( blocks . clone ( ) ) ) ,
615616 Arc :: new ( CardanoTransactionRepository :: new ( Arc :: new ( connection) ) ) ,
616617 )
617618 } ;
0 commit comments