Skip to content

Commit 1723461

Browse files
committed
refactor: move broadcast logic to a helper function
Before introducing the payjoin sending logic (which will also make use of broadcasting), moving the broadcast logic out of the current Broadcast command since the same logic will be shared between both Broadcast and the SendPayjoin which is implemented later.
1 parent f0b51cb commit 1723461

File tree

1 file changed

+102
-80
lines changed

1 file changed

+102
-80
lines changed

src/handlers.rs

Lines changed: 102 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -523,92 +523,15 @@ pub(crate) async fn handle_online_wallet_subcommand(
523523
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
524524
(None, None) => panic!("Missing `psbt` and `tx` option"),
525525
};
526-
let txid = match client {
527-
#[cfg(feature = "electrum")]
528-
Electrum {
529-
client,
530-
batch_size: _,
531-
} => client
532-
.transaction_broadcast(&tx)
533-
.map_err(|e| Error::Generic(e.to_string()))?,
534-
#[cfg(feature = "esplora")]
535-
Esplora {
536-
client,
537-
parallel_requests: _,
538-
} => client
539-
.broadcast(&tx)
540-
.await
541-
.map(|()| tx.compute_txid())
542-
.map_err(|e| Error::Generic(e.to_string()))?,
543-
#[cfg(feature = "rpc")]
544-
RpcClient { client } => client
545-
.send_raw_transaction(&tx)
546-
.map_err(|e| Error::Generic(e.to_string()))?,
526+
let txid = broadcast_transaction(client, tx).await?;
527+
528+
547529

548-
#[cfg(feature = "cbf")]
549-
KyotoClient { client } => {
550-
let LightClient {
551-
requester,
552-
mut log_subscriber,
553-
mut info_subscriber,
554-
mut warning_subscriber,
555-
update_subscriber: _,
556-
node,
557-
} = client;
558-
559-
let subscriber = tracing_subscriber::FmtSubscriber::new();
560-
tracing::subscriber::set_global_default(subscriber)
561-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
562-
563-
tokio::task::spawn(async move { node.run().await });
564-
tokio::task::spawn(async move {
565-
select! {
566-
log = log_subscriber.recv() => {
567-
if let Some(log) = log {
568-
tracing::info!("{log}");
569-
}
570-
},
571-
warn = warning_subscriber.recv() => {
572-
if let Some(warn) = warn {
573-
tracing::warn!("{warn}");
574530
}
575531
}
576532
}
577-
});
578-
let txid = tx.compute_txid();
579-
tracing::info!("Waiting for connections to broadcast...");
580-
while let Some(info) = info_subscriber.recv().await {
581-
match info {
582-
Info::ConnectionsMet => {
583-
requester
584-
.broadcast_random(tx.clone())
585-
.map_err(|e| Error::Generic(format!("{}", e)))?;
586-
break;
587-
}
588-
_ => tracing::info!("{info}"),
589533
}
590534
}
591-
tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
592-
while let Some(info) = info_subscriber.recv().await {
593-
match info {
594-
Info::TxGossiped(wtxid) => {
595-
tracing::info!("Successfully broadcast WTXID: {wtxid}");
596-
break;
597-
}
598-
Info::ConnectionsMet => {
599-
tracing::info!("Rebroadcasting to new connections");
600-
requester.broadcast_random(tx.clone()).unwrap();
601-
}
602-
_ => tracing::info!("{info}"),
603-
}
604-
}
605-
})
606-
.await
607-
.map_err(|_| {
608-
tracing::warn!("Broadcast was unsuccessful");
609-
Error::Generic("Transaction broadcast timed out after 15 seconds".into())
610-
})?;
611-
txid
612535
}
613536
};
614537
Ok(json!({ "txid": txid }))
@@ -967,6 +890,105 @@ async fn respond(
967890
}
968891
}
969892

893+
#[cfg(any(
894+
feature = "electrum",
895+
feature = "esplora",
896+
feature = "cbf",
897+
feature = "rpc"
898+
))]
899+
/// Broadcasts a given transaction using the blockchain client.
900+
async fn broadcast_transaction(client: BlockchainClient, tx: Transaction) -> Result<Txid, Error> {
901+
match client {
902+
#[cfg(feature = "electrum")]
903+
Electrum {
904+
client,
905+
batch_size: _,
906+
} => client
907+
.transaction_broadcast(&tx)
908+
.map_err(|e| Error::Generic(e.to_string())),
909+
#[cfg(feature = "esplora")]
910+
Esplora {
911+
client,
912+
parallel_requests: _,
913+
} => client
914+
.broadcast(&tx)
915+
.await
916+
.map(|()| tx.compute_txid())
917+
.map_err(|e| Error::Generic(e.to_string())),
918+
#[cfg(feature = "rpc")]
919+
RpcClient { client } => client
920+
.send_raw_transaction(&tx)
921+
.map_err(|e| Error::Generic(e.to_string())),
922+
923+
#[cfg(feature = "cbf")]
924+
KyotoClient { client } => {
925+
let LightClient {
926+
requester,
927+
mut log_subscriber,
928+
mut info_subscriber,
929+
mut warning_subscriber,
930+
update_subscriber: _,
931+
node,
932+
} = client;
933+
934+
let subscriber = tracing_subscriber::FmtSubscriber::new();
935+
tracing::subscriber::set_global_default(subscriber)
936+
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
937+
938+
tokio::task::spawn(async move { node.run().await });
939+
tokio::task::spawn(async move {
940+
select! {
941+
log = log_subscriber.recv() => {
942+
if let Some(log) = log {
943+
tracing::info!("{log}");
944+
}
945+
},
946+
warn = warning_subscriber.recv() => {
947+
if let Some(warn) = warn {
948+
tracing::warn!("{warn}");
949+
}
950+
}
951+
}
952+
});
953+
let txid = tx.compute_txid();
954+
tracing::info!("Waiting for connections to broadcast...");
955+
while let Some(info) = info_subscriber.recv().await {
956+
match info {
957+
Info::ConnectionsMet => {
958+
requester
959+
.broadcast_random(tx.clone())
960+
.map_err(|e| Error::Generic(format!("{}", e)))?;
961+
break;
962+
}
963+
_ => tracing::info!("{info}"),
964+
}
965+
}
966+
// TODO: This has been and still is (after moving to a helper function) swallowing the error. Need to fix.
967+
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
968+
while let Some(info) = info_subscriber.recv().await {
969+
match info {
970+
Info::TxGossiped(wtxid) => {
971+
tracing::info!("Successfully broadcast WTXID: {wtxid}");
972+
break;
973+
}
974+
Info::ConnectionsMet => {
975+
tracing::info!("Rebroadcasting to new connections");
976+
requester.broadcast_random(tx.clone()).unwrap();
977+
}
978+
_ => tracing::info!("{info}"),
979+
}
980+
}
981+
})
982+
.await
983+
.map_err(|_| {
984+
tracing::warn!("Broadcast was unsuccessful");
985+
Error::Generic("Transaction broadcast timed out after 15 seconds".into())
986+
});
987+
Ok(txid)
988+
}
989+
}
990+
}
991+
970992
#[cfg(feature = "repl")]
971993
fn readline() -> Result<String, Error> {
972994
write!(std::io::stdout(), "> ").map_err(|e| Error::Generic(e.to_string()))?;

0 commit comments

Comments
 (0)