@@ -470,8 +470,7 @@ where
470470 return Ok ( ( ) ) ;
471471 }
472472
473- let persist_fut = {
474- let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
473+ self . update_state ( |state_lock| -> Result < ( ( ) , bool ) , ( ) > {
475474 for descriptor in relevant_descriptors {
476475 let output_info = TrackedSpendableOutput {
477476 descriptor,
@@ -491,17 +490,12 @@ where
491490 }
492491
493492 state_lock. outputs . push ( output_info) ;
493+ state_lock. dirty = true ;
494494 }
495495
496- state_lock. dirty = false ;
497- self . persist_state ( & state_lock)
498- } ;
499-
500- persist_fut. await . map_err ( |e| {
501- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
502-
503- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
496+ Ok ( ( ( ) , true ) )
504497 } )
498+ . await
505499 }
506500
507501 /// Returns a list of the currently tracked spendable outputs.
@@ -557,29 +551,18 @@ where
557551 } ;
558552
559553 // See if there is anything to sweep before requesting a change address.
560- let ( persist_fut, has_respends) = {
561- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
562-
563- let cur_height = sweeper_state. best_block . height ;
564- let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
565- // If there is nothing to sweep, we still persist the state if it is dirty.
566- let fut_opt = if !has_respends && sweeper_state. dirty {
567- sweeper_state. dirty = false ;
568- Some ( self . persist_state ( & sweeper_state) )
569- } else {
570- None
571- } ;
572-
573- ( fut_opt, has_respends)
574- } ;
554+ let has_respends = self
555+ . update_state ( |sweeper_state| -> Result < ( bool , bool ) , ( ) > {
556+ let cur_height = sweeper_state. best_block . height ;
557+ let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
575558
576- if let Some ( persist_fut ) = persist_fut {
577- persist_fut . await . map_err ( |e| {
578- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
559+ // If there are respends, we don't need to persist a dirty state already now and can postpone it until
560+ // after the sweep.
561+ let persist_if_dirty = !has_respends ;
579562
580- log_error ! ( self . logger , "Error persisting OutputSweeper: {:?}" , e ) ;
581- } ) ? ;
582- } ;
563+ Ok ( ( has_respends , persist_if_dirty ) )
564+ } )
565+ . await ? ;
583566
584567 if !has_respends {
585568 return Ok ( ( ) ) ;
@@ -590,67 +573,56 @@ where
590573 self . change_destination_source . get_change_destination_script ( ) . await ?;
591574
592575 // Sweep the outputs.
593- let spending_tx;
594- let persist_fut = {
595- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
596-
597- let cur_height = sweeper_state. best_block . height ;
598- let cur_hash = sweeper_state. best_block . block_hash ;
576+ let spending_tx = self
577+ . update_state ( |sweeper_state| -> Result < ( Option < Transaction > , bool ) , ( ) > {
578+ let cur_height = sweeper_state. best_block . height ;
579+ let cur_hash = sweeper_state. best_block . block_hash ;
599580
600- let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
601- . outputs
602- . iter ( )
603- . filter ( |o| filter_fn ( * o, cur_height) )
604- . map ( |o| & o. descriptor )
605- . collect ( ) ;
606-
607- // Exit if there is nothing to spend anymore and there also is no need to persist the state.
608- if respend_descriptors. is_empty ( ) && !sweeper_state. dirty {
609- return Ok ( ( ) ) ;
610- }
611-
612- // Generate the spending transaction and broadcast it.
613- spending_tx = if !respend_descriptors. is_empty ( ) {
614- let spending_tx = self
615- . spend_outputs ( & sweeper_state, & respend_descriptors, change_destination_script)
616- . map_err ( |e| {
617- log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
618- } ) ?;
619-
620- log_debug ! (
621- self . logger,
622- "Generating and broadcasting sweeping transaction {}" ,
623- spending_tx. compute_txid( )
624- ) ;
581+ let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
582+ . outputs
583+ . iter ( )
584+ . filter ( |o| filter_fn ( * o, cur_height) )
585+ . map ( |o| & o. descriptor )
586+ . collect ( ) ;
587+
588+ // Generate the spending transaction and broadcast it.
589+ if !respend_descriptors. is_empty ( ) {
590+ let spending_tx = self
591+ . spend_outputs (
592+ & sweeper_state,
593+ & respend_descriptors,
594+ change_destination_script,
595+ )
596+ . map_err ( |e| {
597+ log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
598+ } ) ?;
599+
600+ log_debug ! (
601+ self . logger,
602+ "Generating and broadcasting sweeping transaction {}" ,
603+ spending_tx. compute_txid( )
604+ ) ;
625605
626- // As we didn't modify the state so far, the same filter_fn yields the same elements as
627- // above.
628- let respend_outputs =
629- sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
630- for output_info in respend_outputs {
631- if let Some ( filter) = self . chain_data_source . as_ref ( ) {
632- let watched_output = output_info. to_watched_output ( cur_hash) ;
633- filter. register_output ( watched_output) ;
606+ // As we didn't modify the state so far, the same filter_fn yields the same elements as
607+ // above.
608+ let respend_outputs =
609+ sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
610+ for output_info in respend_outputs {
611+ if let Some ( filter) = self . chain_data_source . as_ref ( ) {
612+ let watched_output = output_info. to_watched_output ( cur_hash) ;
613+ filter. register_output ( watched_output) ;
614+ }
615+
616+ output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
617+ sweeper_state. dirty = true ;
634618 }
635619
636- output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
620+ Ok ( ( Some ( spending_tx) , true ) )
621+ } else {
622+ Ok ( ( None , true ) )
637623 }
638-
639- Some ( spending_tx)
640- } else {
641- None
642- } ;
643-
644- // Either the state was already dirty or we modified it above, so we persist it.
645- sweeper_state. dirty = false ;
646- self . persist_state ( & sweeper_state)
647- } ;
648-
649- persist_fut. await . map_err ( |e| {
650- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
651-
652- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
653- } ) ?;
624+ } )
625+ . await ?;
654626
655627 // Persistence completely successfully. If we have a spending transaction, we broadcast it.
656628 if let Some ( spending_tx) = spending_tx {
@@ -695,6 +667,37 @@ where
695667 )
696668 }
697669
670+ /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
671+ /// but only if persist_if_dirty is also true. Returning false for persist_if_dirty allows the callback to postpone
672+ /// persisting a potentially dirty state.
673+ async fn update_state < X > (
674+ & self , callback : impl FnOnce ( & mut SweeperState ) -> Result < ( X , bool ) , ( ) > ,
675+ ) -> Result < X , ( ) > {
676+ let ( fut, res) ;
677+ {
678+ let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
679+
680+ let persist_if_dirty;
681+ ( res, persist_if_dirty) = callback ( & mut state_lock) ?;
682+
683+ if !state_lock. dirty || !persist_if_dirty {
684+ return Ok ( res) ;
685+ }
686+
687+ state_lock. dirty = false ;
688+
689+ fut = self . persist_state ( & state_lock) ;
690+ } ;
691+
692+ fut. await . map_err ( |e| {
693+ self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
694+
695+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
696+ } ) ?;
697+
698+ Ok ( res)
699+ }
700+
698701 fn spend_outputs (
699702 & self , sweeper_state : & SweeperState , descriptors : & [ & SpendableOutputDescriptor ] ,
700703 change_destination_script : ScriptBuf ,
0 commit comments