Skip to content

Commit 609795e

Browse files
committed
refactor: move broadcast logic to a helper function
Before introducing the payjoin sending logic (which will also make use of broadcasting), moving the broadcast logic out of the current Broadcast command since the same logic will be shared between both Broadcast and the SendPayjoin which is implemented later.
1 parent 7a71b14 commit 609795e

File tree

1 file changed

+89
-79
lines changed

1 file changed

+89
-79
lines changed

src/handlers.rs

Lines changed: 89 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -523,85 +523,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
523523
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
524524
(None, None) => panic!("Missing `psbt` and `tx` option"),
525525
};
526-
let txid = match client {
527-
#[cfg(feature = "electrum")]
528-
Electrum {
529-
client,
530-
batch_size: _,
531-
} => client
532-
.transaction_broadcast(&tx)
533-
.map_err(|e| Error::Generic(e.to_string()))?,
534-
#[cfg(feature = "esplora")]
535-
Esplora {
536-
client,
537-
parallel_requests: _,
538-
} => client
539-
.broadcast(&tx)
540-
.await
541-
.map(|()| tx.compute_txid())
542-
.map_err(|e| Error::Generic(e.to_string()))?,
543-
#[cfg(feature = "rpc")]
544-
RpcClient { client } => client
545-
.send_raw_transaction(&tx)
546-
.map_err(|e| Error::Generic(e.to_string()))?,
547-
548-
#[cfg(feature = "cbf")]
549-
KyotoClient { client } => {
550-
let LightClient {
551-
requester,
552-
mut log_subscriber,
553-
mut info_subscriber,
554-
mut warning_subscriber,
555-
update_subscriber: _,
556-
node,
557-
} = *client;
558-
559-
let subscriber = tracing_subscriber::FmtSubscriber::new();
560-
tracing::subscriber::set_global_default(subscriber)
561-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
562-
563-
tokio::task::spawn(async move { node.run().await });
564-
tokio::task::spawn(async move {
565-
select! {
566-
log = log_subscriber.recv() => {
567-
if let Some(log) = log {
568-
tracing::info!("{log}");
569-
}
570-
},
571-
warn = warning_subscriber.recv() => {
572-
if let Some(warn) = warn {
573-
tracing::warn!("{warn}");
574-
}
575-
}
576-
}
577-
});
578-
let txid = tx.compute_txid();
579-
requester
580-
.broadcast_random(tx.clone())
581-
.map_err(|e| Error::Generic(format!("{}", e)))?;
582-
tokio::time::timeout(tokio::time::Duration::from_secs(30), async move {
583-
while let Some(info) = info_subscriber.recv().await {
584-
match info {
585-
Info::TxGossiped(wtxid) => {
586-
tracing::info!("Successfully broadcast WTXID: {wtxid}");
587-
break;
588-
}
589-
Info::ConnectionsMet => {
590-
tracing::info!("Rebroadcasting to new connections");
591-
requester.broadcast_random(tx.clone()).unwrap();
592-
}
593-
_ => tracing::info!("{info}"),
594-
}
595-
}
596-
})
597-
.await
598-
.map_err(|_| {
599-
tracing::warn!("Broadcast was unsuccessful");
600-
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
601-
})?;
602-
txid
603-
}
604-
};
526+
let txid = broadcast_transaction(client, tx).await?;
605527
Ok(json!({ "txid": txid }))
606528
}
607529
}
@@ -958,6 +880,94 @@ async fn respond(
958880
}
959881
}
960882

883+
#[cfg(any(
884+
feature = "electrum",
885+
feature = "esplora",
886+
feature = "cbf",
887+
feature = "rpc"
888+
))]
889+
/// Broadcasts a given transaction using the blockchain client.
890+
async fn broadcast_transaction(client: BlockchainClient, tx: Transaction) -> Result<Txid, Error> {
891+
match client {
892+
#[cfg(feature = "electrum")]
893+
Electrum {
894+
client,
895+
batch_size: _,
896+
} => client
897+
.transaction_broadcast(&tx)
898+
.map_err(|e| Error::Generic(e.to_string())),
899+
#[cfg(feature = "esplora")]
900+
Esplora {
901+
client,
902+
parallel_requests: _,
903+
} => client
904+
.broadcast(&tx)
905+
.await
906+
.map(|()| tx.compute_txid())
907+
.map_err(|e| Error::Generic(e.to_string())),
908+
#[cfg(feature = "rpc")]
909+
RpcClient { client } => client
910+
.send_raw_transaction(&tx)
911+
.map_err(|e| Error::Generic(e.to_string())),
912+
#[cfg(feature = "cbf")]
913+
KyotoClient { client } => {
914+
let LightClient {
915+
requester,
916+
mut log_subscriber,
917+
mut info_subscriber,
918+
mut warning_subscriber,
919+
update_subscriber: _,
920+
node,
921+
} = *client;
922+
923+
let subscriber = tracing_subscriber::FmtSubscriber::new();
924+
tracing::subscriber::set_global_default(subscriber)
925+
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
926+
927+
tokio::task::spawn(async move { node.run().await });
928+
tokio::task::spawn(async move {
929+
select! {
930+
log = log_subscriber.recv() => {
931+
if let Some(log) = log {
932+
tracing::info!("{log}");
933+
}
934+
},
935+
warn = warning_subscriber.recv() => {
936+
if let Some(warn) = warn {
937+
tracing::warn!("{warn}");
938+
}
939+
}
940+
}
941+
});
942+
let txid = tx.compute_txid();
943+
requester
944+
.broadcast_random(tx.clone())
945+
.map_err(|e| Error::Generic(format!("{}", e)))?;
946+
tokio::time::timeout(tokio::time::Duration::from_secs(30), async move {
947+
while let Some(info) = info_subscriber.recv().await {
948+
match info {
949+
Info::TxGossiped(wtxid) => {
950+
tracing::info!("Successfully broadcast WTXID: {wtxid}");
951+
break;
952+
}
953+
Info::ConnectionsMet => {
954+
tracing::info!("Rebroadcasting to new connections");
955+
requester.broadcast_random(tx.clone()).unwrap();
956+
}
957+
_ => tracing::info!("{info}"),
958+
}
959+
}
960+
})
961+
.await
962+
.map_err(|_| {
963+
tracing::warn!("Broadcast was unsuccessful");
964+
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
965+
})?;
966+
Ok(txid)
967+
}
968+
}
969+
}
970+
961971
#[cfg(feature = "repl")]
962972
fn readline() -> Result<String, Error> {
963973
write!(std::io::stdout(), "> ").map_err(|e| Error::Generic(e.to_string()))?;

0 commit comments

Comments
 (0)