Skip to content

Commit 62ff445

Browse files
committed
Merge #182: Implement transaction broadcasting for Kyoto
811f614 fix(cbf): typo and cbf dir config when sqlite feature disabled (Steve Myers) b04fed2 feat(cbf): implement transaction broadcasting (rustaceanrob) Pull request description: The actual implementation comes down to listening for an info message that reports the transaction was sent to a peer. For simplicity I am ignoring any wallet updates, but if the user calls the `Sync` command they can catch them. Follows up #181 ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk-cli/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### New Features: * [ ] I've added tests for the new feature * [ ] I've added docs for the new feature * [ ] I've updated `CHANGELOG.md` #### Bugfixes: * [ ] This pull request breaks the existing API * [ ] I've added tests to reproduce the issue which are now passing * [ ] I'm linking the issue being fixed by this PR ACKs for top commit: notmandatory: ACK 811f614 Tree-SHA512: 6a1ae4cee58170be5ac444598ea8362e8bd7c77137e2f010e8120869d7491c0ad37798508cea04da6a755d4a1a85b283454daf11624a3bb485e61003ae194007
2 parents a7e6909 + 811f614 commit 62ff445

File tree

4 files changed

+95
-24
lines changed

4 files changed

+95
-24
lines changed

src/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub struct ProxyOpts {
239239
#[derive(Debug, Args, Clone, PartialEq, Eq)]
240240
pub struct CompactFilterOpts {
241241
/// Sets the number of parallel node connections.
242-
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
242+
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
243243
pub conn_count: u8,
244244

245245
/// Optionally skip initial `skip_blocks` blocks.

src/error.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use bdk_wallet::bitcoin::hex::HexToBytesError;
2+
use bdk_wallet::bitcoin::psbt::ExtractTxError;
23
use bdk_wallet::bitcoin::{base64, consensus};
34
use thiserror::Error;
45

@@ -51,7 +52,7 @@ pub enum BDKCliError {
5152
ParseOutPointError(#[from] bdk_wallet::bitcoin::blockdata::transaction::ParseOutPointError),
5253

5354
#[error("PsbtExtractTxError: {0}")]
54-
PsbtExtractTxError(#[from] bdk_wallet::bitcoin::psbt::ExtractTxError),
55+
PsbtExtractTxError(Box<ExtractTxError>),
5556

5657
#[error("PsbtError: {0}")]
5758
PsbtError(#[from] bdk_wallet::bitcoin::psbt::Error),
@@ -90,3 +91,9 @@ pub enum BDKCliError {
9091
#[error("BDK-Kyoto error: {0}")]
9192
BuilderError(#[from] bdk_kyoto::builder::BuilderError),
9293
}
94+
95+
impl From<ExtractTxError> for BDKCliError {
96+
fn from(value: ExtractTxError) -> Self {
97+
BDKCliError::PsbtExtractTxError(Box::new(value))
98+
}
99+
}

src/handlers.rs

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,17 @@ use std::collections::BTreeMap;
4242
#[cfg(any(feature = "electrum", feature = "esplora"))]
4343
use std::collections::HashSet;
4444
use std::convert::TryFrom;
45-
#[cfg(feature = "repl")]
45+
#[cfg(any(feature = "repl", feature = "electrum", feature = "esplora"))]
4646
use std::io::Write;
4747
use std::str::FromStr;
4848

4949
#[cfg(feature = "electrum")]
5050
use crate::utils::BlockchainClient::Electrum;
51+
#[cfg(feature = "cbf")]
52+
use bdk_kyoto::{Info, LightClient};
5153
use bdk_wallet::bitcoin::base64::prelude::*;
54+
#[cfg(feature = "cbf")]
55+
use tokio::select;
5256
#[cfg(any(
5357
feature = "electrum",
5458
feature = "esplora",
@@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand(
507511
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
508512
(None, None) => panic!("Missing `psbt` and `tx` option"),
509513
};
510-
511514
let txid = match client {
512515
#[cfg(feature = "electrum")]
513516
Electrum {
@@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand(
531534
.map_err(|e| Error::Generic(e.to_string()))?,
532535

533536
#[cfg(feature = "cbf")]
534-
KyotoClient { client: _ } => {
535-
unimplemented!()
537+
KyotoClient { client } => {
538+
let LightClient {
539+
requester,
540+
mut log_subscriber,
541+
mut info_subscriber,
542+
mut warning_subscriber,
543+
update_subscriber: _,
544+
node,
545+
} = client;
546+
547+
let subscriber = tracing_subscriber::FmtSubscriber::new();
548+
tracing::subscriber::set_global_default(subscriber)
549+
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
550+
551+
tokio::task::spawn(async move { node.run().await });
552+
tokio::task::spawn(async move {
553+
select! {
554+
log = log_subscriber.recv() => {
555+
if let Some(log) = log {
556+
tracing::info!("{log}");
557+
}
558+
},
559+
warn = warning_subscriber.recv() => {
560+
if let Some(warn) = warn {
561+
tracing::warn!("{warn}");
562+
}
563+
}
564+
}
565+
});
566+
let txid = tx.compute_txid();
567+
tracing::info!("Waiting for connections to broadcast...");
568+
while let Some(info) = info_subscriber.recv().await {
569+
match info {
570+
Info::ConnectionsMet => {
571+
requester
572+
.broadcast_random(tx.clone())
573+
.map_err(|e| Error::Generic(format!("{}", e)))?;
574+
break;
575+
}
576+
_ => tracing::info!("{info}"),
577+
}
578+
}
579+
tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
580+
while let Some(info) = info_subscriber.recv().await {
581+
match info {
582+
Info::TxGossiped(wtxid) => {
583+
tracing::info!("Successfully broadcast WTXID: {wtxid}");
584+
break;
585+
}
586+
Info::ConnectionsMet => {
587+
tracing::info!("Rebroadcasting to new connections");
588+
requester.broadcast_random(tx.clone()).unwrap();
589+
}
590+
_ => tracing::info!("{info}"),
591+
}
592+
}
593+
})
594+
.await
595+
.map_err(|_| {
596+
tracing::warn!("Broadcast was unsuccessful");
597+
Error::Generic("Transaction broadcast timed out after 15 seconds".into())
598+
})?;
599+
txid
536600
}
537601
};
538602
Ok(json!({ "txid": txid }))
@@ -681,11 +745,11 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
681745
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
682746
} => {
683747
let network = cli_opts.network;
748+
let home_dir = prepare_home_dir(cli_opts.datadir)?;
749+
let wallet_name = &wallet_opts.wallet;
750+
let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?;
684751
#[cfg(feature = "sqlite")]
685752
let result = {
686-
let home_dir = prepare_home_dir(cli_opts.datadir)?;
687-
let wallet_name = &wallet_opts.wallet;
688-
let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?;
689753
let mut persister = match &wallet_opts.database_type {
690754
#[cfg(feature = "sqlite")]
691755
DatabaseType::Sqlite => {
@@ -698,7 +762,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
698762

699763
let mut wallet = new_persisted_wallet(network, &mut persister, &wallet_opts)?;
700764
let blockchain_client =
701-
new_blockchain_client(&wallet_opts, &wallet, Some(database_path))?;
765+
new_blockchain_client(&wallet_opts, &wallet, database_path)?;
702766

703767
let result = handle_online_wallet_subcommand(
704768
&mut wallet,
@@ -711,6 +775,9 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
711775
};
712776
#[cfg(not(any(feature = "sqlite")))]
713777
let result = {
778+
let wallet = new_wallet(network, &wallet_opts)?;
779+
let blockchain_client =
780+
crate::utils::new_blockchain_client(&wallet_opts, &wallet, database_path)?;
714781
let mut wallet = new_wallet(network, &wallet_opts)?;
715782
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
716783
.await?
@@ -807,7 +874,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
807874
&mut wallet,
808875
&wallet_opts,
809876
line,
810-
Some(database_path.clone()),
877+
database_path.clone(),
811878
)
812879
.await;
813880
#[cfg(feature = "sqlite")]
@@ -840,7 +907,7 @@ async fn respond(
840907
wallet: &mut Wallet,
841908
wallet_opts: &WalletOpts,
842909
line: &str,
843-
_datadir: Option<std::path::PathBuf>,
910+
_datadir: std::path::PathBuf,
844911
) -> Result<bool, String> {
845912
use clap::Parser;
846913

@@ -857,7 +924,7 @@ async fn respond(
857924
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
858925
} => {
859926
let blockchain =
860-
new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?;
927+
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
861928
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
862929
.await
863930
.map_err(|e| e.to_string())?;

src/utils.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
use crate::error::BDKCliError as Error;
1313
use std::str::FromStr;
1414

15-
#[cfg(feature = "sqlite")]
1615
use std::path::{Path, PathBuf};
1716

1817
use crate::commands::WalletOpts;
@@ -76,11 +75,11 @@ pub(crate) fn parse_address(address_str: &str) -> Result<Address, Error> {
7675
Ok(unchecked_address.assume_checked())
7776
}
7877

79-
#[cfg(feature = "sqlite")]
8078
/// Prepare bdk-cli home directory
8179
///
8280
/// This function is called to check if [`crate::CliOpts`] datadir is set.
8381
/// If not the default home directory is created at `~/.bdk-bitcoin`.
82+
#[allow(dead_code)]
8483
pub(crate) fn prepare_home_dir(home_path: Option<PathBuf>) -> Result<PathBuf, Error> {
8584
let dir = home_path.unwrap_or_else(|| {
8685
let mut dir = PathBuf::new();
@@ -101,11 +100,11 @@ pub(crate) fn prepare_home_dir(home_path: Option<PathBuf>) -> Result<PathBuf, Er
101100
}
102101

103102
/// Prepare wallet database directory.
104-
#[cfg(feature = "sqlite")]
103+
#[allow(dead_code)]
105104
pub(crate) fn prepare_wallet_db_dir(
106105
wallet_name: &Option<String>,
107106
home_path: &Path,
108-
) -> Result<PathBuf, Error> {
107+
) -> Result<std::path::PathBuf, Error> {
109108
let mut dir = home_path.to_owned();
110109
if let Some(wallet_name) = wallet_name {
111110
dir.push(wallet_name);
@@ -153,8 +152,8 @@ pub(crate) enum BlockchainClient {
153152
/// Create a new blockchain from the wallet configuration options.
154153
pub(crate) fn new_blockchain_client(
155154
wallet_opts: &WalletOpts,
156-
wallet: &Wallet,
157-
datadir: Option<std::path::PathBuf>,
155+
_wallet: &Wallet,
156+
_datadir: PathBuf,
158157
) -> Result<BlockchainClient, Error> {
159158
#[cfg(any(feature = "electrum", feature = "esplora", feature = "rpc"))]
160159
let url = wallet_opts.url.as_str();
@@ -200,14 +199,12 @@ pub(crate) fn new_blockchain_client(
200199
None => Sync,
201200
};
202201

203-
let mut builder = NodeBuilder::new(wallet.network());
202+
let builder = NodeBuilder::new(_wallet.network());
204203

205-
if let Some(datadir) = datadir {
206-
builder = builder.data_dir(&datadir);
207-
};
208204
let client = builder
209205
.required_peers(wallet_opts.compactfilter_opts.conn_count)
210-
.build_with_wallet(wallet, scan_type)?;
206+
.data_dir(&_datadir)
207+
.build_with_wallet(_wallet, scan_type)?;
211208

212209
BlockchainClient::KyotoClient { client }
213210
}

0 commit comments

Comments
 (0)