@@ -382,8 +382,10 @@ where
382382 output_spender : O , change_destination_source : D , kv_store : K , logger : L ,
383383 ) -> Self {
384384 let outputs = Vec :: new ( ) ;
385- let sweeper_state =
386- Mutex :: new ( SweeperState { persistent : PersistentSweeperState { outputs, best_block } } ) ;
385+ let sweeper_state = Mutex :: new ( SweeperState {
386+ persistent : PersistentSweeperState { outputs, best_block } ,
387+ dirty : false ,
388+ } ) ;
387389 Self {
388390 sweeper_state,
389391 pending_sweep : AtomicBool :: new ( false ) ,
@@ -450,7 +452,7 @@ where
450452
451453 state_lock. persistent . outputs . push ( output_info) ;
452454 }
453- self . persist_state ( & state_lock) . map_err ( |e| {
455+ self . flush_state ( & mut state_lock) . map_err ( |e| {
454456 log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
455457 } )
456458 }
@@ -478,7 +480,19 @@ where
478480 return Ok ( ( ) ) ;
479481 }
480482
481- let result = self . regenerate_and_broadcast_spend_if_necessary_internal ( ) . await ;
483+ let result = {
484+ self . regenerate_and_broadcast_spend_if_necessary_internal ( ) . await ?;
485+
486+ // If there is still dirty state, we need to persist it.
487+ let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
488+ if sweeper_state. dirty {
489+ self . flush_state ( & mut sweeper_state) . map_err ( |e| {
490+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
491+ } )
492+ } else {
493+ Ok ( ( ) )
494+ }
495+ } ;
482496
483497 // Release the pending sweep flag again, regardless of result.
484498 self . pending_sweep . store ( false , Ordering :: Release ) ;
@@ -571,7 +585,7 @@ where
571585 output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
572586 }
573587
574- self . persist_state ( & sweeper_state) . map_err ( |e| {
588+ self . flush_state ( & mut sweeper_state) . map_err ( |e| {
575589 log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
576590 } ) ?;
577591
@@ -599,9 +613,12 @@ where
599613 }
600614 true
601615 } ) ;
616+
617+ sweeper_state. dirty = true ;
602618 }
603619
604- fn persist_state ( & self , sweeper_state : & SweeperState ) -> Result < ( ) , io:: Error > {
620+ /// Flushes the current state to the persistence layer and marks the state as clean.
621+ fn flush_state ( & self , sweeper_state : & mut SweeperState ) -> Result < ( ) , io:: Error > {
605622 self . kv_store
606623 . write (
607624 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -620,6 +637,9 @@ where
620637 ) ;
621638 e
622639 } )
640+ . map ( |_| {
641+ sweeper_state. dirty = false ;
642+ } )
623643 }
624644
625645 fn spend_outputs (
@@ -652,13 +672,17 @@ where
652672 }
653673 }
654674 }
675+
676+ sweeper_state. dirty = true ;
655677 }
656678
657679 fn best_block_updated_internal (
658680 & self , sweeper_state : & mut SweeperState , header : & Header , height : u32 ,
659681 ) {
660682 sweeper_state. persistent . best_block = BestBlock :: new ( header. block_hash ( ) , height) ;
661683 self . prune_confirmed_outputs ( sweeper_state) ;
684+
685+ sweeper_state. dirty = true ;
662686 }
663687}
664688
@@ -682,12 +706,8 @@ where
682706 assert_eq ! ( state_lock. persistent. best_block. height, height - 1 ,
683707 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height" ) ;
684708
685- self . transactions_confirmed_internal ( & mut * state_lock, header, txdata, height) ;
686- self . best_block_updated_internal ( & mut * state_lock, header, height) ;
687-
688- let _ = self . persist_state ( & * state_lock) . map_err ( |e| {
689- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
690- } ) ;
709+ self . transactions_confirmed_internal ( & mut state_lock, header, txdata, height) ;
710+ self . best_block_updated_internal ( & mut state_lock, header, height) ;
691711 }
692712
693713 fn block_disconnected ( & self , header : & Header , height : u32 ) {
@@ -709,9 +729,7 @@ where
709729 }
710730 }
711731
712- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
713- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
714- } ) ;
732+ state_lock. dirty = true ;
715733 }
716734}
717735
@@ -731,9 +749,6 @@ where
731749 ) {
732750 let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
733751 self . transactions_confirmed_internal ( & mut * state_lock, header, txdata, height) ;
734- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
735- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
736- } ) ;
737752 }
738753
739754 fn transaction_unconfirmed ( & self , txid : & Txid ) {
@@ -756,18 +771,13 @@ where
756771 . filter ( |o| o. status . confirmation_height ( ) >= Some ( unconf_height) )
757772 . for_each ( |o| o. status . unconfirmed ( ) ) ;
758773
759- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
760- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
761- } ) ;
774+ state_lock. dirty = true ;
762775 }
763776 }
764777
765778 fn best_block_updated ( & self , header : & Header , height : u32 ) {
766779 let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
767- self . best_block_updated_internal ( & mut * state_lock, header, height) ;
768- let _ = self . persist_state ( & * state_lock) . map_err ( |e| {
769- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
770- } ) ;
780+ self . best_block_updated_internal ( & mut state_lock, header, height) ;
771781 }
772782
773783 fn get_relevant_txids ( & self ) -> Vec < ( Txid , u32 , Option < BlockHash > ) > {
@@ -796,6 +806,7 @@ where
796806#[ derive( Debug ) ]
797807struct SweeperState {
798808 persistent : PersistentSweeperState ,
809+ dirty : bool ,
799810}
800811
801812#[ derive( Debug , Clone ) ]
@@ -860,7 +871,7 @@ where
860871 }
861872 }
862873
863- let sweeper_state = Mutex :: new ( SweeperState { persistent : state } ) ;
874+ let sweeper_state = Mutex :: new ( SweeperState { persistent : state, dirty : false } ) ;
864875 Ok ( Self {
865876 sweeper_state,
866877 pending_sweep : AtomicBool :: new ( false ) ,
@@ -909,7 +920,7 @@ where
909920 }
910921 }
911922
912- let sweeper_state = Mutex :: new ( SweeperState { persistent : state } ) ;
923+ let sweeper_state = Mutex :: new ( SweeperState { persistent : state, dirty : false } ) ;
913924 Ok ( (
914925 best_block,
915926 OutputSweeper {
0 commit comments