Skip to content

Commit b04fed2

Browse files
rustaceanrobnotmandatory
authored andcommitted
feat(cbf): implement transaction broadcasting
For the highest reliability, we wait for the connection requirement to be met by the node. Once met, we can broadcast and wait for confirmation. The function will either timeout after 15 seconds or successfully finish with gossip confirmation.
1 parent a7e6909 commit b04fed2

File tree

2 files changed

+69
-5
lines changed

2 files changed

+69
-5
lines changed

src/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub struct ProxyOpts {
239239
#[derive(Debug, Args, Clone, PartialEq, Eq)]
240240
pub struct CompactFilterOpts {
241241
/// Sets the number of parallel node connections.
242-
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
242+
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
243243
pub conn_count: u8,
244244

245245
/// Optionally skip initial `skip_blocks` blocks.

src/handlers.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ use std::str::FromStr;
4848

4949
#[cfg(feature = "electrum")]
5050
use crate::utils::BlockchainClient::Electrum;
51+
#[cfg(feature = "cbf")]
52+
use bdk_kyoto::{Info, LightClient};
5153
use bdk_wallet::bitcoin::base64::prelude::*;
54+
#[cfg(feature = "cbf")]
55+
use tokio::select;
5256
#[cfg(any(
5357
feature = "electrum",
5458
feature = "esplora",
@@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand(
507511
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
508512
(None, None) => panic!("Missing `psbt` and `tx` option"),
509513
};
510-
511514
let txid = match client {
512515
#[cfg(feature = "electrum")]
513516
Electrum {
@@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand(
531534
.map_err(|e| Error::Generic(e.to_string()))?,
532535

533536
#[cfg(feature = "cbf")]
534-
KyotoClient { client: _ } => {
535-
unimplemented!()
537+
KyotoClient { client } => {
538+
let LightClient {
539+
requester,
540+
mut log_subscriber,
541+
mut info_subscriber,
542+
mut warning_subscriber,
543+
update_subscriber: _,
544+
node,
545+
} = client;
546+
547+
let subscriber = tracing_subscriber::FmtSubscriber::new();
548+
tracing::subscriber::set_global_default(subscriber)
549+
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
550+
551+
tokio::task::spawn(async move { node.run().await });
552+
tokio::task::spawn(async move {
553+
select! {
554+
log = log_subscriber.recv() => {
555+
if let Some(log) = log {
556+
tracing::info!("{log}");
557+
}
558+
},
559+
warn = warning_subscriber.recv() => {
560+
if let Some(warn) = warn {
561+
tracing::warn!("{warn}");
562+
}
563+
}
564+
}
565+
});
566+
let txid = tx.compute_txid();
567+
tracing::info!("Waiting for connections to broadcast...");
568+
while let Some(info) = info_subscriber.recv().await {
569+
match info {
570+
Info::ConnectionsMet => {
571+
requester
572+
.broadcast_random(tx.clone())
573+
.map_err(|e| Error::Generic(format!("{}", e)))?;
574+
break;
575+
}
576+
_ => tracing::info!("{info}"),
577+
}
578+
}
579+
tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
580+
while let Some(info) = info_subscriber.recv().await {
581+
match info {
582+
Info::TxGossiped(wtxid) => {
583+
tracing::info!("Succuessfully broadcast WTXID: {wtxid}");
584+
break;
585+
}
586+
Info::ConnectionsMet => {
587+
tracing::info!("Rebroadcasting to new connections");
588+
requester.broadcast_random(tx.clone()).unwrap();
589+
}
590+
_ => tracing::info!("{info}"),
591+
}
592+
}
593+
})
594+
.await
595+
.map_err(|_| {
596+
tracing::warn!("Broadcast was unsuccessful");
597+
Error::Generic("Transaction broadcast timed out after 15 seconds".into())
598+
})?;
599+
txid
536600
}
537601
};
538602
Ok(json!({ "txid": txid }))
@@ -857,7 +921,7 @@ async fn respond(
857921
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
858922
} => {
859923
let blockchain =
860-
new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?;
924+
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
861925
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
862926
.await
863927
.map_err(|e| e.to_string())?;

0 commit comments

Comments
 (0)