@@ -17,7 +17,7 @@ use bdk::{SignOptions, SyncOptions};
1717use bitcoin:: { BlockHash , Script , Transaction , Txid } ;
1818
1919use std:: collections:: HashSet ;
20- use std:: sync:: { Arc , Mutex , MutexGuard } ;
20+ use std:: sync:: { Arc , Mutex } ;
2121
2222/// The minimum feerate we are allowed to send, as specify by LDK.
2323const MIN_FEERATE : u32 = 253 ;
3232 watched_transactions : Mutex < Vec < Txid > > ,
3333 queued_outputs : Mutex < Vec < WatchedOutput > > ,
3434 watched_outputs : Mutex < Vec < WatchedOutput > > ,
35- last_sync_height : Mutex < Option < u32 > > ,
35+ last_sync_height : tokio :: sync :: Mutex < Option < u32 > > ,
3636 logger : Arc < FilesystemLogger > ,
3737}
3838
4848 let queued_transactions = Mutex :: new ( Vec :: new ( ) ) ;
4949 let watched_outputs = Mutex :: new ( Vec :: new ( ) ) ;
5050 let queued_outputs = Mutex :: new ( Vec :: new ( ) ) ;
51- let last_sync_height = Mutex :: new ( None ) ;
51+ let last_sync_height = tokio :: sync :: Mutex :: new ( None ) ;
5252 Self {
5353 blockchain,
5454 wallet,
6161 }
6262 }
6363
64- pub ( crate ) fn sync_wallet ( & self ) -> Result < ( ) , Error > {
64+ pub ( crate ) async fn sync_wallet ( & self ) -> Result < ( ) , Error > {
6565 let sync_options = SyncOptions { progress : None } ;
6666
6767 self . wallet
@@ -73,73 +73,70 @@ where
7373 Ok ( ( ) )
7474 }
7575
76- pub ( crate ) fn sync ( & self , confirmables : Vec < & ( dyn Confirm + Sync ) > ) -> Result < ( ) , Error > {
76+ pub ( crate ) async fn sync ( & self , confirmables : Vec < & ( dyn Confirm + Sync ) > ) -> Result < ( ) , Error > {
7777 let client = & * self . blockchain ;
7878
79- let cur_height = client. get_height ( ) ?;
79+ let cur_height = client. get_height ( ) . await ?;
8080
81- let mut locked_last_sync_height = self . last_sync_height . lock ( ) . unwrap ( ) ;
81+ let mut locked_last_sync_height = self . last_sync_height . lock ( ) . await ;
8282 if cur_height >= locked_last_sync_height. unwrap_or ( 0 ) {
83- self . sync_best_block_updated (
84- confirmables. clone ( ) ,
85- cur_height,
86- & mut locked_last_sync_height,
87- ) ?;
88- self . sync_transactions_confirmed ( confirmables. clone ( ) ) ?;
89- self . sync_transaction_unconfirmed ( confirmables. clone ( ) ) ?;
83+ self . sync_best_block_updated ( & confirmables, cur_height, & mut locked_last_sync_height)
84+ . await ?;
85+ self . sync_transactions_confirmed ( & confirmables) . await ?;
86+ self . sync_transaction_unconfirmed ( & confirmables) . await ?;
9087 }
9188 // TODO: check whether new outputs have been registered by now and process them
9289 Ok ( ( ) )
9390 }
9491
95- fn sync_best_block_updated (
96- & self , confirmables : Vec < & ( dyn Confirm + Sync ) > , cur_height : u32 ,
97- locked_last_sync_height : & mut MutexGuard < Option < u32 > > ,
92+ async fn sync_best_block_updated (
93+ & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > , cur_height : u32 ,
94+ locked_last_sync_height : & mut tokio :: sync :: MutexGuard < ' _ , Option < u32 > > ,
9895 ) -> Result < ( ) , Error > {
9996 let client = & * self . blockchain ;
10097
10198 // Inform the interface of the new block.
102- let cur_block_header = client. get_header ( cur_height) ?;
103- for c in & confirmables {
99+ let cur_block_header = client. get_header ( cur_height) . await ?;
100+ for c in confirmables {
104101 c. best_block_updated ( & cur_block_header, cur_height) ;
105102 }
106103
107104 * * locked_last_sync_height = Some ( cur_height) ;
108105 Ok ( ( ) )
109106 }
110107
111- fn sync_transactions_confirmed (
112- & self , confirmables : Vec < & ( dyn Confirm + Sync ) > ,
108+ async fn sync_transactions_confirmed (
109+ & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > ,
113110 ) -> Result < ( ) , Error > {
114111 let client = & * self . blockchain ;
115112
116113 // First, check the confirmation status of registered transactions as well as the
117114 // status of dependent transactions of registered outputs.
118- let mut locked_queued_transactions = self . queued_transactions . lock ( ) . unwrap ( ) ;
119- let mut locked_queued_outputs = self . queued_outputs . lock ( ) . unwrap ( ) ;
120- let mut locked_watched_transactions = self . watched_transactions . lock ( ) . unwrap ( ) ;
121- let mut locked_watched_outputs = self . watched_outputs . lock ( ) . unwrap ( ) ;
122115
123116 let mut confirmed_txs = Vec :: new ( ) ;
124117
125118 // Check in the current queue, as well as in registered transactions leftover from
126119 // previous iterations.
127- let registered_txs: HashSet < Txid > = locked_watched_transactions
128- . iter ( )
129- . chain ( locked_queued_transactions. iter ( ) )
130- . cloned ( )
131- . collect ( ) ;
120+ let registered_txs: HashSet < Txid > = {
121+ let locked_queued_transactions = self . queued_transactions . lock ( ) . unwrap ( ) ;
122+ let locked_watched_transactions = self . watched_transactions . lock ( ) . unwrap ( ) ;
123+ locked_watched_transactions
124+ . iter ( )
125+ . chain ( locked_queued_transactions. iter ( ) )
126+ . cloned ( )
127+ . collect ( )
128+ } ;
132129
133130 // Remember all registered but unconfirmed transactions for future processing.
134131 let mut unconfirmed_registered_txs = Vec :: new ( ) ;
135132
136133 for txid in registered_txs {
137- if let Some ( tx_status) = client. get_tx_status ( & txid) ? {
134+ if let Some ( tx_status) = client. get_tx_status ( & txid) . await ? {
138135 if tx_status. confirmed {
139- if let Some ( tx) = client. get_tx ( & txid) ? {
136+ if let Some ( tx) = client. get_tx ( & txid) . await ? {
140137 if let Some ( block_height) = tx_status. block_height {
141- let block_header = client. get_header ( block_height) ?;
142- if let Some ( merkle_proof) = client. get_merkle_proof ( & txid) ? {
138+ let block_header = client. get_header ( block_height) . await ?;
139+ if let Some ( merkle_proof) = client. get_merkle_proof ( & txid) . await ? {
143140 confirmed_txs. push ( (
144141 tx,
145142 block_height,
@@ -156,25 +153,29 @@ where
156153 }
157154
158155 // Check all registered outputs for dependent spending transactions.
159- let registered_outputs: Vec < WatchedOutput > =
160- locked_watched_outputs. iter ( ) . chain ( locked_queued_outputs. iter ( ) ) . cloned ( ) . collect ( ) ;
156+ let registered_outputs: Vec < WatchedOutput > = {
157+ let locked_queued_outputs = self . queued_outputs . lock ( ) . unwrap ( ) ;
158+ let locked_watched_outputs = self . watched_outputs . lock ( ) . unwrap ( ) ;
159+ locked_watched_outputs. iter ( ) . chain ( locked_queued_outputs. iter ( ) ) . cloned ( ) . collect ( )
160+ } ;
161161
162162 // Remember all registered outputs that haven't been spent for future processing.
163163 let mut unspent_registered_outputs = Vec :: new ( ) ;
164164
165165 for output in registered_outputs {
166- if let Some ( output_status) =
167- client. get_output_status ( & output. outpoint . txid , output. outpoint . index as u64 ) ?
166+ if let Some ( output_status) = client
167+ . get_output_status ( & output. outpoint . txid , output. outpoint . index as u64 )
168+ . await ?
168169 {
169170 if output_status. spent {
170171 if let Some ( spending_tx_status) = output_status. status {
171172 if spending_tx_status. confirmed {
172173 let spending_txid = output_status. txid . unwrap ( ) ;
173- if let Some ( spending_tx) = client. get_tx ( & spending_txid) ? {
174+ if let Some ( spending_tx) = client. get_tx ( & spending_txid) . await ? {
174175 let block_height = spending_tx_status. block_height . unwrap ( ) ;
175- let block_header = client. get_header ( block_height) ?;
176+ let block_header = client. get_header ( block_height) . await ?;
176177 if let Some ( merkle_proof) =
177- client. get_merkle_proof ( & spending_txid) ?
178+ client. get_merkle_proof ( & spending_txid) . await ?
178179 {
179180 confirmed_txs. push ( (
180181 spending_tx,
@@ -200,41 +201,38 @@ where
200201 } ,
201202 ) ;
202203 for ( tx, block_height, block_header, pos) in confirmed_txs {
203- for c in & confirmables {
204+ for c in confirmables {
204205 c. transactions_confirmed ( & block_header, & [ ( pos, & tx) ] , block_height) ;
205206 }
206207 }
207208
208- * locked_watched_transactions = unconfirmed_registered_txs ;
209- * locked_queued_transactions = Vec :: new ( ) ;
210- * locked_watched_outputs = unspent_registered_outputs ;
211- * locked_queued_outputs = Vec :: new ( ) ;
209+ * self . queued_transactions . lock ( ) . unwrap ( ) = Vec :: new ( ) ;
210+ * self . watched_transactions . lock ( ) . unwrap ( ) = unconfirmed_registered_txs ;
211+ * self . queued_outputs . lock ( ) . unwrap ( ) = Vec :: new ( ) ;
212+ * self . watched_outputs . lock ( ) . unwrap ( ) = unspent_registered_outputs ;
212213
213214 Ok ( ( ) )
214215 }
215216
216- fn sync_transaction_unconfirmed (
217- & self , confirmables : Vec < & ( dyn Confirm + Sync ) > ,
217+ async fn sync_transaction_unconfirmed (
218+ & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > ,
218219 ) -> Result < ( ) , Error > {
219220 let client = & * self . blockchain ;
220221 // Query the interface for relevant txids and check whether they have been
221222 // reorged-out of the chain.
222- let unconfirmed_txids = confirmables
223- . iter ( )
224- . flat_map ( |c| c. get_relevant_txids ( ) )
225- . filter ( |txid| {
226- client
227- . get_tx_status ( txid)
228- . ok ( )
229- . unwrap_or ( None )
230- . map_or ( true , |status| !status. confirmed )
231- } )
232- . collect :: < Vec < Txid > > ( ) ;
233-
234- // Mark all relevant unconfirmed transactions as unconfirmed.
235- for txid in & unconfirmed_txids {
236- for c in & confirmables {
237- c. transaction_unconfirmed ( txid) ;
223+ let relevant_txids =
224+ confirmables. iter ( ) . flat_map ( |c| c. get_relevant_txids ( ) ) . collect :: < HashSet < Txid > > ( ) ;
225+ for txid in relevant_txids {
226+ let tx_unconfirmed = client
227+ . get_tx_status ( & txid)
228+ . await
229+ . ok ( )
230+ . unwrap_or ( None )
231+ . map_or ( true , |status| !status. confirmed ) ;
232+ if tx_unconfirmed {
233+ for c in confirmables {
234+ c. transaction_unconfirmed ( & txid) ;
235+ }
238236 }
239237 }
240238
0 commit comments