@@ -101,24 +101,25 @@ impl CardanoTransactionsImporter {
101101 ) ;
102102 return Ok ( ( ) ) ;
103103 }
104-
105- // todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
106- // at once in memory (probably using iterators)
107- let scanned_blocks = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
108- let parsed_transactions: Vec < CardanoTransaction > = scanned_blocks
109- . into_iter ( )
110- . flat_map ( |b| b. into_transactions ( ) )
111- . collect ( ) ;
112104 debug ! (
113105 self . logger,
114- "TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'" ,
115- parsed_transactions. len( ) ,
106+ "TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'" ,
116107 from. unwrap_or( 0 )
117108 ) ;
118109
119- self . transaction_store
120- . store_transactions ( parsed_transactions)
121- . await ?;
110+ let mut streamer = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
111+
112+ while let Some ( blocks) = streamer. poll_next ( ) . await ? {
113+ let parsed_transactions: Vec < CardanoTransaction > = blocks
114+ . into_iter ( )
115+ . flat_map ( |b| b. into_transactions ( ) )
116+ . collect ( ) ;
117+
118+ self . transaction_store
119+ . store_transactions ( parsed_transactions)
120+ . await ?;
121+ }
122+
122123 Ok ( ( ) )
123124 }
124125
@@ -180,7 +181,9 @@ impl TransactionsImporter for CardanoTransactionsImporter {
180181mod tests {
181182 use mockall:: mock;
182183
183- use mithril_common:: cardano_block_scanner:: { DumbBlockScanner , ScannedBlock } ;
184+ use mithril_common:: cardano_block_scanner:: {
185+ BlockStreamer , DumbBlockScanner , DumbBlockStreamer , ScannedBlock ,
186+ } ;
184187 use mithril_common:: crypto_helper:: MKTree ;
185188 use mithril_common:: entities:: { BlockNumber , BlockRangesSequence } ;
186189
@@ -199,7 +202,7 @@ mod tests {
199202 dirpath: & Path ,
200203 from_immutable: Option <ImmutableFileNumber >,
201204 until_immutable: ImmutableFileNumber ,
202- ) -> StdResult <Vec < ScannedBlock >>;
205+ ) -> StdResult <Box <dyn BlockStreamer >>;
203206 }
204207 }
205208
@@ -267,7 +270,7 @@ mod tests {
267270 scanner_mock
268271 . expect_scan ( )
269272 . withf ( move |_, from, until| from. is_none ( ) && until == & up_to_beacon)
270- . return_once ( move |_, _, _| Ok ( blocks) ) ;
273+ . return_once ( move |_, _, _| Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ blocks] ) ) ) ) ;
271274 CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
272275 } ;
273276
@@ -429,7 +432,9 @@ mod tests {
429432 scanner_mock
430433 . expect_scan ( )
431434 . withf ( move |_, from, until| from == & Some ( 12 ) && until == & up_to_beacon)
432- . return_once ( move |_, _, _| Ok ( scanned_blocks) )
435+ . return_once ( move |_, _, _| {
436+ Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ scanned_blocks] ) ) )
437+ } )
433438 . once ( ) ;
434439 CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
435440 } ;
@@ -607,11 +612,9 @@ mod tests {
607612 let transactions = into_transactions ( & blocks) ;
608613 let importer = {
609614 let connection = cardano_tx_db_connection ( ) . unwrap ( ) ;
610- let mut scanner = MockBlockScannerImpl :: new ( ) ;
611- scanner. expect_scan ( ) . return_once ( move |_, _, _| Ok ( blocks) ) ;
612615
613616 CardanoTransactionsImporter :: new_for_test (
614- Arc :: new ( scanner ) ,
617+ Arc :: new ( DumbBlockScanner :: new ( blocks . clone ( ) ) ) ,
615618 Arc :: new ( CardanoTransactionRepository :: new ( Arc :: new ( connection) ) ) ,
616619 )
617620 } ;
0 commit comments