@@ -469,8 +469,7 @@ where
469469 return Ok ( ( ) ) ;
470470 }
471471
472- let persist_fut = {
473- let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
472+ self . update_state ( |state_lock| -> Result < ( ( ) , bool ) , ( ) > {
474473 for descriptor in relevant_descriptors {
475474 let output_info = TrackedSpendableOutput {
476475 descriptor,
@@ -490,17 +489,12 @@ where
490489 }
491490
492491 state_lock. outputs . push ( output_info) ;
492+ state_lock. dirty = true ;
493493 }
494494
495- state_lock. dirty = false ;
496- self . persist_state ( & state_lock)
497- } ;
498-
499- persist_fut. await . map_err ( |e| {
500- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
501-
502- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
495+ Ok ( ( ( ) , true ) )
503496 } )
497+ . await
504498 }
505499
506500 /// Returns a list of the currently tracked spendable outputs.
@@ -556,29 +550,18 @@ where
556550 } ;
557551
558552 // See if there is anything to sweep before requesting a change address.
559- let ( persist_fut, has_respends) = {
560- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
561-
562- let cur_height = sweeper_state. best_block . height ;
563- let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
564- // If there is nothing to sweep, we still persist the state if it is dirty.
565- let fut_opt = if !has_respends && sweeper_state. dirty {
566- sweeper_state. dirty = false ;
567- Some ( self . persist_state ( & sweeper_state) )
568- } else {
569- None
570- } ;
571-
572- ( fut_opt, has_respends)
573- } ;
553+ let has_respends = self
554+ . update_state ( |sweeper_state| -> Result < ( bool , bool ) , ( ) > {
555+ let cur_height = sweeper_state. best_block . height ;
556+ let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
574557
575- if let Some ( persist_fut ) = persist_fut {
576- persist_fut . await . map_err ( |e| {
577- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
558+ // If there are respends, we don't need to persist a dirty state already now and can postpone it until
559+ // after the sweep.
560+ let persist_if_dirty = !has_respends ;
578561
579- log_error ! ( self . logger , "Error persisting OutputSweeper: {:?}" , e ) ;
580- } ) ? ;
581- } ;
562+ Ok ( ( has_respends , persist_if_dirty ) )
563+ } )
564+ . await ? ;
582565
583566 if !has_respends {
584567 return Ok ( ( ) ) ;
@@ -589,67 +572,56 @@ where
589572 self . change_destination_source . get_change_destination_script ( ) . await ?;
590573
591574 // Sweep the outputs.
592- let spending_tx;
593- let persist_fut = {
594- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
595-
596- let cur_height = sweeper_state. best_block . height ;
597- let cur_hash = sweeper_state. best_block . block_hash ;
575+ let spending_tx = self
576+ . update_state ( |sweeper_state| -> Result < ( Option < Transaction > , bool ) , ( ) > {
577+ let cur_height = sweeper_state. best_block . height ;
578+ let cur_hash = sweeper_state. best_block . block_hash ;
598579
599- let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
600- . outputs
601- . iter ( )
602- . filter ( |o| filter_fn ( * o, cur_height) )
603- . map ( |o| & o. descriptor )
604- . collect ( ) ;
605-
606- // Exit if there is nothing to spend anymore and there also is no need to persist the state.
607- if respend_descriptors. is_empty ( ) && !sweeper_state. dirty {
608- return Ok ( ( ) ) ;
609- }
610-
611- // Generate the spending transaction and broadcast it.
612- spending_tx = if !respend_descriptors. is_empty ( ) {
613- let spending_tx = self
614- . spend_outputs ( & sweeper_state, & respend_descriptors, change_destination_script)
615- . map_err ( |e| {
616- log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
617- } ) ?;
618-
619- log_debug ! (
620- self . logger,
621- "Generating and broadcasting sweeping transaction {}" ,
622- spending_tx. compute_txid( )
623- ) ;
580+ let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
581+ . outputs
582+ . iter ( )
583+ . filter ( |o| filter_fn ( * o, cur_height) )
584+ . map ( |o| & o. descriptor )
585+ . collect ( ) ;
586+
587+ // Generate the spending transaction and broadcast it.
588+ if !respend_descriptors. is_empty ( ) {
589+ let spending_tx = self
590+ . spend_outputs (
591+ & sweeper_state,
592+ & respend_descriptors,
593+ change_destination_script,
594+ )
595+ . map_err ( |e| {
596+ log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
597+ } ) ?;
598+
599+ log_debug ! (
600+ self . logger,
601+ "Generating and broadcasting sweeping transaction {}" ,
602+ spending_tx. compute_txid( )
603+ ) ;
624604
625- // As we didn't modify the state so far, the same filter_fn yields the same elements as
626- // above.
627- let respend_outputs =
628- sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
629- for output_info in respend_outputs {
630- if let Some ( filter) = self . chain_data_source . as_ref ( ) {
631- let watched_output = output_info. to_watched_output ( cur_hash) ;
632- filter. register_output ( watched_output) ;
605+ // As we didn't modify the state so far, the same filter_fn yields the same elements as
606+ // above.
607+ let respend_outputs =
608+ sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
609+ for output_info in respend_outputs {
610+ if let Some ( filter) = self . chain_data_source . as_ref ( ) {
611+ let watched_output = output_info. to_watched_output ( cur_hash) ;
612+ filter. register_output ( watched_output) ;
613+ }
614+
615+ output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
616+ sweeper_state. dirty = true ;
633617 }
634618
635- output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
619+ Ok ( ( Some ( spending_tx) , true ) )
620+ } else {
621+ Ok ( ( None , true ) )
636622 }
637-
638- Some ( spending_tx)
639- } else {
640- None
641- } ;
642-
643- // Either the state was already dirty or we modified it above, so we persist it.
644- sweeper_state. dirty = false ;
645- self . persist_state ( & sweeper_state)
646- } ;
647-
648- persist_fut. await . map_err ( |e| {
649- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
650-
651- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
652- } ) ?;
623+ } )
624+ . await ?;
653625
654626 // Persistence completely successfully. If we have a spending transaction, we broadcast it.
655627 if let Some ( spending_tx) = spending_tx {
@@ -694,6 +666,34 @@ where
694666 )
695667 }
696668
669+ /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
670+ /// but only if persist_if_dirty is also true. Returning false for persist_if_dirty allows the callback to postpone
671+ /// persisting a potentially dirty state.
672+ async fn update_state < X > (
673+ & self , callback : impl FnOnce ( & mut SweeperState ) -> Result < ( X , bool ) , ( ) > ,
674+ ) -> Result < X , ( ) > {
675+ let ( fut, res) = {
676+ let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
677+
678+ let ( res, persist_if_dirty) = callback ( & mut state_lock) ?;
679+ if !state_lock. dirty || !persist_if_dirty {
680+ return Ok ( res) ;
681+ }
682+
683+ state_lock. dirty = false ;
684+
685+ ( self . persist_state ( & state_lock) , res)
686+ } ;
687+
688+ fut. await . map_err ( |e| {
689+ self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
690+
691+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
692+ } ) ?;
693+
694+ Ok ( res)
695+ }
696+
697697 fn spend_outputs (
698698 & self , sweeper_state : & SweeperState , descriptors : & [ & SpendableOutputDescriptor ] ,
699699 change_destination_script : ScriptBuf ,
0 commit comments