@@ -4,35 +4,47 @@ use crate::logger::{
44 log_error, log_given_level, log_info, log_internal, log_trace, log_warn, FilesystemLogger ,
55 Logger ,
66} ;
7+ use crate :: { scid_utils, LdkLiteConfig } ;
78
89use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
910use lightning:: chain:: WatchedOutput ;
10- use lightning:: chain:: { Confirm , Filter } ;
11+ use lightning:: chain:: { Access , AccessError , Confirm , Filter } ;
1112
1213use bdk:: blockchain:: { Blockchain , EsploraBlockchain , GetBlockHash , GetHeight , GetTx } ;
1314use bdk:: database:: BatchDatabase ;
15+ use bdk:: esplora_client;
1416use bdk:: wallet:: AddressIndex ;
1517use bdk:: { SignOptions , SyncOptions } ;
1618
17- use bitcoin:: { BlockHash , Script , Transaction , Txid } ;
19+ use bitcoin:: { BlockHash , Script , Transaction , TxOut , Txid } ;
1820
1921use std:: collections:: HashSet ;
20- use std:: sync:: { Arc , Mutex } ;
22+ use std:: sync:: { Arc , Mutex , RwLock } ;
2123
2224/// The minimum feerate we are allowed to send, as specify by LDK.
2325const MIN_FEERATE : u32 = 253 ;
2426
27+ // The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
28+ // number of blocks after which BDK stops looking for scripts belonging to the wallet.
29+ const BDK_CLIENT_STOP_GAP : usize = 20 ;
30+
31+ // The number of concurrent requests made against the API provider.
32+ const BDK_CLIENT_CONCURRENCY : u8 = 8 ;
33+
2534pub struct LdkLiteChainAccess < D >
2635where
2736 D : BatchDatabase ,
2837{
2938 blockchain : EsploraBlockchain ,
39+ client : Arc < esplora_client:: AsyncClient > ,
3040 wallet : Mutex < bdk:: Wallet < D > > ,
3141 queued_transactions : Mutex < Vec < Txid > > ,
3242 watched_transactions : Mutex < Vec < Txid > > ,
3343 queued_outputs : Mutex < Vec < WatchedOutput > > ,
3444 watched_outputs : Mutex < Vec < WatchedOutput > > ,
3545 last_sync_height : tokio:: sync:: Mutex < Option < u32 > > ,
46+ tokio_runtime : RwLock < Option < Arc < tokio:: runtime:: Runtime > > > ,
47+ config : Arc < LdkLiteConfig > ,
3648 logger : Arc < FilesystemLogger > ,
3749}
3850
@@ -41,26 +53,45 @@ where
4153 D : BatchDatabase ,
4254{
4355 pub ( crate ) fn new (
44- blockchain : EsploraBlockchain , wallet : bdk:: Wallet < D > , logger : Arc < FilesystemLogger > ,
56+ wallet : bdk:: Wallet < D > , config : Arc < LdkLiteConfig > , logger : Arc < FilesystemLogger > ,
4557 ) -> Self {
4658 let wallet = Mutex :: new ( wallet) ;
4759 let watched_transactions = Mutex :: new ( Vec :: new ( ) ) ;
4860 let queued_transactions = Mutex :: new ( Vec :: new ( ) ) ;
4961 let watched_outputs = Mutex :: new ( Vec :: new ( ) ) ;
5062 let queued_outputs = Mutex :: new ( Vec :: new ( ) ) ;
5163 let last_sync_height = tokio:: sync:: Mutex :: new ( None ) ;
64+ let tokio_runtime = RwLock :: new ( None ) ;
65+ // TODO: Check that we can be sure that the Esplora client re-connects in case of failure
66+ // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
67+ let blockchain = EsploraBlockchain :: new ( & config. esplora_server_url , BDK_CLIENT_STOP_GAP )
68+ . with_concurrency ( BDK_CLIENT_CONCURRENCY ) ;
69+ let client_builder =
70+ esplora_client:: Builder :: new ( & format ! ( "http://{}" , & config. esplora_server_url) ) ;
71+ let client = Arc :: new ( client_builder. build_async ( ) . unwrap ( ) ) ;
5272 Self {
5373 blockchain,
74+ client,
5475 wallet,
5576 queued_transactions,
5677 watched_transactions,
5778 queued_outputs,
5879 watched_outputs,
5980 last_sync_height,
81+ tokio_runtime,
82+ config,
6083 logger,
6184 }
6285 }
6386
87+ pub ( crate ) fn set_runtime ( & self , tokio_runtime : Arc < tokio:: runtime:: Runtime > ) {
88+ * self . tokio_runtime . write ( ) . unwrap ( ) = Some ( tokio_runtime) ;
89+ }
90+
91+ pub ( crate ) fn drop_runtime ( & self ) {
92+ * self . tokio_runtime . write ( ) . unwrap ( ) = None ;
93+ }
94+
6495 pub ( crate ) async fn sync_wallet ( & self ) -> Result < ( ) , Error > {
6596 let sync_options = SyncOptions { progress : None } ;
6697
74105 }
75106
76107 pub ( crate ) async fn sync ( & self , confirmables : Vec < & ( dyn Confirm + Sync ) > ) -> Result < ( ) , Error > {
77- let client = & * self . blockchain ;
78-
79- let cur_height = client. get_height ( ) . await ?;
108+ let cur_height = self . client . get_height ( ) . await ?;
80109
81110 let mut locked_last_sync_height = self . last_sync_height . lock ( ) . await ;
82111 if cur_height >= locked_last_sync_height. unwrap_or ( 0 ) {
@@ -93,10 +122,8 @@ where
93122 & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > , cur_height : u32 ,
94123 locked_last_sync_height : & mut tokio:: sync:: MutexGuard < ' _ , Option < u32 > > ,
95124 ) -> Result < ( ) , Error > {
96- let client = & * self . blockchain ;
97-
98125 // Inform the interface of the new block.
99- let cur_block_header = client. get_header ( cur_height) . await ?;
126+ let cur_block_header = self . client . get_header ( cur_height) . await ?;
100127 for c in confirmables {
101128 c. best_block_updated ( & cur_block_header, cur_height) ;
102129 }
@@ -108,8 +135,6 @@ where
108135 async fn sync_transactions_confirmed (
109136 & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > ,
110137 ) -> Result < ( ) , Error > {
111- let client = & * self . blockchain ;
112-
113138 // First, check the confirmation status of registered transactions as well as the
114139 // status of dependent transactions of registered outputs.
115140
@@ -131,12 +156,12 @@ where
131156 let mut unconfirmed_registered_txs = Vec :: new ( ) ;
132157
133158 for txid in registered_txs {
134- if let Some ( tx_status) = client. get_tx_status ( & txid) . await ? {
159+ if let Some ( tx_status) = self . client . get_tx_status ( & txid) . await ? {
135160 if tx_status. confirmed {
136- if let Some ( tx) = client. get_tx ( & txid) . await ? {
161+ if let Some ( tx) = self . client . get_tx ( & txid) . await ? {
137162 if let Some ( block_height) = tx_status. block_height {
138- let block_header = client. get_header ( block_height) . await ?;
139- if let Some ( merkle_proof) = client. get_merkle_proof ( & txid) . await ? {
163+ let block_header = self . client . get_header ( block_height) . await ?;
164+ if let Some ( merkle_proof) = self . client . get_merkle_proof ( & txid) . await ? {
140165 confirmed_txs. push ( (
141166 tx,
142167 block_height,
@@ -163,19 +188,20 @@ where
163188 let mut unspent_registered_outputs = Vec :: new ( ) ;
164189
165190 for output in registered_outputs {
166- if let Some ( output_status) = client
191+ if let Some ( output_status) = self
192+ . client
167193 . get_output_status ( & output. outpoint . txid , output. outpoint . index as u64 )
168194 . await ?
169195 {
170196 if output_status. spent {
171197 if let Some ( spending_tx_status) = output_status. status {
172198 if spending_tx_status. confirmed {
173199 let spending_txid = output_status. txid . unwrap ( ) ;
174- if let Some ( spending_tx) = client. get_tx ( & spending_txid) . await ? {
200+ if let Some ( spending_tx) = self . client . get_tx ( & spending_txid) . await ? {
175201 let block_height = spending_tx_status. block_height . unwrap ( ) ;
176- let block_header = client. get_header ( block_height) . await ?;
202+ let block_header = self . client . get_header ( block_height) . await ?;
177203 if let Some ( merkle_proof) =
178- client. get_merkle_proof ( & spending_txid) . await ?
204+ self . client . get_merkle_proof ( & spending_txid) . await ?
179205 {
180206 confirmed_txs. push ( (
181207 spending_tx,
@@ -217,13 +243,13 @@ where
217243 async fn sync_transaction_unconfirmed (
218244 & self , confirmables : & Vec < & ( dyn Confirm + Sync ) > ,
219245 ) -> Result < ( ) , Error > {
220- let client = & * self . blockchain ;
221246 // Query the interface for relevant txids and check whether they have been
222247 // reorged-out of the chain.
223248 let relevant_txids =
224249 confirmables. iter ( ) . flat_map ( |c| c. get_relevant_txids ( ) ) . collect :: < HashSet < Txid > > ( ) ;
225250 for txid in relevant_txids {
226- let tx_unconfirmed = client
251+ let tx_unconfirmed = self
252+ . client
227253 . get_tx_status ( & txid)
228254 . await
229255 . ok ( )
@@ -300,6 +326,63 @@ where
300326 }
301327}
302328
329+ impl < D > Access for LdkLiteChainAccess < D >
330+ where
331+ D : BatchDatabase ,
332+ {
333+ fn get_utxo (
334+ & self , genesis_hash : & BlockHash , short_channel_id : u64 ,
335+ ) -> Result < TxOut , AccessError > {
336+ if genesis_hash
337+ != & bitcoin:: blockdata:: constants:: genesis_block ( self . config . network )
338+ . header
339+ . block_hash ( )
340+ {
341+ return Err ( AccessError :: UnknownChain ) ;
342+ }
343+
344+ let locked_runtime = self . tokio_runtime . read ( ) . unwrap ( ) ;
345+ if locked_runtime. as_ref ( ) . is_none ( ) {
346+ return Err ( AccessError :: UnknownTx ) ;
347+ }
348+
349+ let block_height = scid_utils:: block_from_scid ( & short_channel_id) ;
350+ let tx_index = scid_utils:: tx_index_from_scid ( & short_channel_id) ;
351+ let vout = scid_utils:: vout_from_scid ( & short_channel_id) ;
352+
353+ let block_hash = self
354+ . blockchain
355+ . get_block_hash ( block_height. into ( ) )
356+ . map_err ( |_| AccessError :: UnknownTx ) ?;
357+
358+ let client_tokio = Arc :: clone ( & self . client ) ;
359+ let txout_opt: Arc < Mutex < Option < TxOut > > > = Arc :: new ( Mutex :: new ( None ) ) ;
360+ let txout_opt_tokio = Arc :: clone ( & txout_opt) ;
361+
362+ locked_runtime. as_ref ( ) . unwrap ( ) . spawn ( async move {
363+ let txid_res =
364+ client_tokio. get_txid_at_block_index ( & block_hash, tx_index as usize ) . await ;
365+
366+ if let Some ( txid) = txid_res. unwrap_or ( None ) {
367+ let tx_res = client_tokio. get_tx ( & txid) . await ;
368+
369+ if let Some ( tx) = tx_res. unwrap_or ( None ) {
370+ if let Some ( tx_out) = tx. output . get ( vout as usize ) {
371+ * txout_opt_tokio. lock ( ) . unwrap ( ) = Some ( tx_out. clone ( ) ) ;
372+ }
373+ }
374+ }
375+ } ) ;
376+
377+ let locked_opt = txout_opt. lock ( ) . unwrap ( ) ;
378+ if let Some ( tx_out) = & * locked_opt {
379+ return Ok ( tx_out. clone ( ) ) ;
380+ } else {
381+ return Err ( AccessError :: UnknownTx ) ;
382+ }
383+ }
384+ }
385+
303386impl < D > Filter for LdkLiteChainAccess < D >
304387where
305388 D : BatchDatabase ,
0 commit comments