@@ -16,10 +16,10 @@ use bitcoin::key::XOnlyPublicKey;
1616use bitcoin:: psbt:: Psbt ;
1717use bitcoin:: { Network , OutPoint , TxOut , WPubkeyHash } ;
1818use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
19- use lightning:: events:: bump_transaction:: Utxo ;
20- use lightning:: events:: bump_transaction:: sync:: WalletSourceSync ;
19+ use lightning:: events:: bump_transaction:: { Utxo , WalletSource } ;
2120use lightning:: log_error;
22- use lightning:: sign:: ChangeDestinationSourceSync ;
21+ use lightning:: sign:: ChangeDestinationSource ;
22+ use lightning:: util:: async_poll:: AsyncResult ;
2323use lightning:: util:: logger:: Logger ;
2424use lightning_block_sync:: http:: HttpEndpoint ;
2525use lightning_block_sync:: rpc:: RpcClient ;
@@ -32,7 +32,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
3232use std:: sync:: Arc ;
3333use std:: time:: Duration ;
3434
35- use tokio:: runtime:: { self , Runtime } ;
35+ use tokio:: runtime:: Handle ;
3636
3737pub struct BitcoindClient {
3838 pub ( crate ) bitcoind_rpc_client : Arc < RpcClient > ,
@@ -42,8 +42,7 @@ pub struct BitcoindClient {
4242 rpc_user : String ,
4343 rpc_password : String ,
4444 fees : Arc < HashMap < ConfirmationTarget , AtomicU32 > > ,
45- main_runtime_handle : runtime:: Handle ,
46- inner_runtime : Arc < Runtime > ,
45+ main_runtime_handle : Handle ,
4746 logger : Arc < FilesystemLogger > ,
4847}
4948
@@ -71,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
7170impl BitcoindClient {
7271 pub ( crate ) async fn new (
7372 host : String , port : u16 , rpc_user : String , rpc_password : String , network : Network ,
74- handle : runtime :: Handle , logger : Arc < FilesystemLogger > ,
73+ handle : Handle , logger : Arc < FilesystemLogger > ,
7574 ) -> std:: io:: Result < Self > {
7675 let http_endpoint = HttpEndpoint :: for_host ( host. clone ( ) ) . with_port ( port) ;
7776 let rpc_credentials =
@@ -100,15 +99,6 @@ impl BitcoindClient {
10099 fees. insert ( ConfirmationTarget :: ChannelCloseMinimum , AtomicU32 :: new ( MIN_FEERATE ) ) ;
101100 fees. insert ( ConfirmationTarget :: OutputSpendingFee , AtomicU32 :: new ( MIN_FEERATE ) ) ;
102101
103- let mut builder = runtime:: Builder :: new_multi_thread ( ) ;
104- let runtime =
105- builder. enable_all ( ) . worker_threads ( 1 ) . thread_name ( "rpc-worker" ) . build ( ) . unwrap ( ) ;
106- let inner_runtime = Arc :: new ( runtime) ;
107- // Tokio will panic if we drop a runtime while in another runtime. Because the entire
108- // application runs inside a tokio runtime, we have to ensure this runtime is never
109- // `drop`'d, which we do by leaking an Arc reference.
110- std:: mem:: forget ( Arc :: clone ( & inner_runtime) ) ;
111-
112102 let client = Self {
113103 bitcoind_rpc_client : Arc :: new ( bitcoind_rpc_client) ,
114104 host,
@@ -118,7 +108,6 @@ impl BitcoindClient {
118108 network,
119109 fees : Arc :: new ( fees) ,
120110 main_runtime_handle : handle. clone ( ) ,
121- inner_runtime,
122111 logger,
123112 } ;
124113 BitcoindClient :: poll_for_fee_estimates (
@@ -131,7 +120,7 @@ impl BitcoindClient {
131120
132121 fn poll_for_fee_estimates (
133122 fees : Arc < HashMap < ConfirmationTarget , AtomicU32 > > , rpc_client : Arc < RpcClient > ,
134- handle : tokio :: runtime :: Handle ,
123+ handle : Handle ,
135124 ) {
136125 handle. spawn ( async move {
137126 loop {
@@ -241,39 +230,6 @@ impl BitcoindClient {
241230 } ) ;
242231 }
243232
244- fn run_future_in_blocking_context < F : Future + Send + ' static > ( & self , future : F ) -> F :: Output
245- where
246- F :: Output : Send + ' static ,
247- {
248- // Tokio deliberately makes it nigh impossible to block on a future in a sync context that
249- // is running in an async task (which makes it really hard to interact with sync code that
250- // has callbacks in an async project).
251- //
252- // Reading the docs, it *seems* like
253- // `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
254- // trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
255- // the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
256- // into a `block_in_place` call, and the inner future requires I/O (which of course it
257- // does, its a future!), the whole thing will come to a grinding halt as no other thread is
258- // allowed to poll I/O until the blocked one finishes.
259- //
260- // This is, of course, nuts, and an almost trivial performance penalty of occasional
261- // additional wakeups would solve this, but tokio refuses to do so because any performance
262- // penalty at all would be too much (tokio issue #4730).
263- //
264- // Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
265- // run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
266- // blocking too many threads on the main runtime). We want to block on that `future` being
267- // run on the other runtime's threads, but tokio only provides `block_on` to do so, which
268- // runs the `future` itself on the current thread, panicing if this thread is already a
269- // part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
270- // have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
271- // `JoinHandle` on the main runtime.
272- tokio:: task:: block_in_place ( move || {
273- self . main_runtime_handle . block_on ( self . inner_runtime . spawn ( future) ) . unwrap ( )
274- } )
275- }
276-
277233 pub fn get_new_rpc_client ( & self ) -> RpcClient {
278234 let http_endpoint = HttpEndpoint :: for_host ( self . host . clone ( ) ) . with_port ( self . port ) ;
279235 let rpc_credentials = base64:: encode ( format ! ( "{}:{}" , self . rpc_user, self . rpc_password) ) ;
@@ -406,60 +362,64 @@ impl BroadcasterInterface for BitcoindClient {
406362 }
407363}
408364
409- impl ChangeDestinationSourceSync for BitcoindClient {
410- fn get_change_destination_script ( & self ) -> Result < ScriptBuf , ( ) > {
411- let future = self . get_new_address ( ) ;
412- Ok ( self . run_future_in_blocking_context ( async move { future. await . script_pubkey ( ) } ) )
365+ impl ChangeDestinationSource for BitcoindClient {
366+ fn get_change_destination_script < ' a > ( & ' a self ) -> AsyncResult < ' a , ScriptBuf > {
367+ Box :: pin ( async move {
368+ Ok ( self . get_new_address ( ) . await . script_pubkey ( ) )
369+ } )
413370 }
414371}
415372
416- impl WalletSourceSync for BitcoindClient {
417- fn list_confirmed_utxos ( & self ) -> Result < Vec < Utxo > , ( ) > {
418- let future = self . list_unspent ( ) ;
419- let utxos = self . run_future_in_blocking_context ( async move { future. await . 0 } ) ;
420- Ok ( utxos
421- . into_iter ( )
422- . filter_map ( |utxo| {
423- let outpoint = OutPoint { txid : utxo. txid , vout : utxo. vout } ;
424- let value = bitcoin:: Amount :: from_sat ( utxo. amount ) ;
425- match utxo. address . witness_program ( ) {
426- Some ( prog) if prog. is_p2wpkh ( ) => {
427- WPubkeyHash :: from_slice ( prog. program ( ) . as_bytes ( ) )
428- . map ( |wpkh| Utxo :: new_v0_p2wpkh ( outpoint, value, & wpkh) )
429- . ok ( )
430- } ,
431- Some ( prog) if prog. is_p2tr ( ) => {
432- // TODO: Add `Utxo::new_v1_p2tr` upstream.
433- XOnlyPublicKey :: from_slice ( prog. program ( ) . as_bytes ( ) )
434- . map ( |_| Utxo {
435- outpoint,
436- output : TxOut {
437- value,
438- script_pubkey : utxo. address . script_pubkey ( ) ,
439- } ,
440- satisfaction_weight : 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 +
441- 1 /* witness items */ + 1 /* schnorr sig len */ + 64 , /* schnorr sig */
442- } )
443- . ok ( )
444- } ,
445- _ => None ,
446- }
447- } )
448- . collect ( ) )
373+ impl WalletSource for BitcoindClient {
374+ fn list_confirmed_utxos < ' a > ( & ' a self ) -> AsyncResult < ' a , Vec < Utxo > > {
375+ Box :: pin ( async move {
376+ let utxos = self . list_unspent ( ) . await . 0 ;
377+ Ok ( utxos
378+ . into_iter ( )
379+ . filter_map ( |utxo| {
380+ let outpoint = OutPoint { txid : utxo. txid , vout : utxo. vout } ;
381+ let value = bitcoin:: Amount :: from_sat ( utxo. amount ) ;
382+ match utxo. address . witness_program ( ) {
383+ Some ( prog) if prog. is_p2wpkh ( ) => {
384+ WPubkeyHash :: from_slice ( prog. program ( ) . as_bytes ( ) )
385+ . map ( |wpkh| Utxo :: new_v0_p2wpkh ( outpoint, value, & wpkh) )
386+ . ok ( )
387+ } ,
388+ Some ( prog) if prog. is_p2tr ( ) => {
389+ // TODO: Add `Utxo::new_v1_p2tr` upstream.
390+ XOnlyPublicKey :: from_slice ( prog. program ( ) . as_bytes ( ) )
391+ . map ( |_| Utxo {
392+ outpoint,
393+ output : TxOut {
394+ value,
395+ script_pubkey : utxo. address . script_pubkey ( ) ,
396+ } ,
397+ satisfaction_weight : 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 +
398+ 1 /* witness items */ + 1 /* schnorr sig len */ + 64 , /* schnorr sig */
399+ } )
400+ . ok ( )
401+ } ,
402+ _ => None ,
403+ }
404+ } )
405+ . collect ( ) )
406+ } )
449407 }
450408
451- fn get_change_script ( & self ) -> Result < ScriptBuf , ( ) > {
452- let future = self . get_new_address ( ) ;
453- Ok ( self . run_future_in_blocking_context ( async move { future. await . script_pubkey ( ) } ) )
409+ fn get_change_script < ' a > ( & ' a self ) -> AsyncResult < ' a , ScriptBuf > {
410+ Box :: pin ( async move {
411+ Ok ( self . get_new_address ( ) . await . script_pubkey ( ) )
412+ } )
454413 }
455414
456- fn sign_psbt ( & self , tx : Psbt ) -> Result < Transaction , ( ) > {
457- let mut tx_bytes = Vec :: new ( ) ;
458- let _ = tx. unsigned_tx . consensus_encode ( & mut tx_bytes) . map_err ( |_| ( ) ) ;
459- let tx_hex = hex_utils:: hex_str ( & tx_bytes) ;
460- let future = self . sign_raw_transaction_with_wallet ( tx_hex) ;
461- let signed_tx = self . run_future_in_blocking_context ( async move { future. await } ) ;
462- let signed_tx_bytes = hex_utils:: to_vec ( & signed_tx. hex ) . ok_or ( ( ) ) ?;
463- Transaction :: consensus_decode ( & mut signed_tx_bytes. as_slice ( ) ) . map_err ( |_| ( ) )
415+ fn sign_psbt < ' a > ( & ' a self , tx : Psbt ) -> AsyncResult < ' a , Transaction > {
416+ Box :: pin ( async move {
417+ let mut tx_bytes = Vec :: new ( ) ;
418+ let _ = tx. unsigned_tx . consensus_encode ( & mut tx_bytes) . map_err ( |_| ( ) ) ;
419+ let tx_hex = hex_utils:: hex_str ( & tx_bytes) ;
420+ let signed_tx = self . sign_raw_transaction_with_wallet ( tx_hex) . await ;
421+ let signed_tx_bytes = hex_utils:: to_vec ( & signed_tx. hex ) . ok_or ( ( ) ) ?;
422+ Transaction :: consensus_decode ( & mut signed_tx_bytes. as_slice ( ) ) . map_err ( |_| ( ) )
423+ } )
464424 }
465425}
0 commit comments