Skip to content

Commit eb7e998

Browse files
committed
refactor: move broadcast logic to a helper function
Before introducing the payjoin sending logic (which will also make use of broadcasting), moving the broadcast logic out of the current Broadcast command since the same logic will be shared between both Broadcast and the SendPayjoin which is implemented later.
1 parent a3fcf8a commit eb7e998

File tree

1 file changed

+89
-60
lines changed

1 file changed

+89
-60
lines changed

src/handlers.rs

Lines changed: 89 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -783,66 +783,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
783783
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
784784
(None, None) => panic!("Missing `psbt` and `tx` option"),
785785
};
786-
let txid = match client {
787-
#[cfg(feature = "electrum")]
788-
Electrum {
789-
client,
790-
batch_size: _,
791-
} => client
792-
.transaction_broadcast(&tx)
793-
.map_err(|e| Error::Generic(e.to_string()))?,
794-
#[cfg(feature = "esplora")]
795-
Esplora {
796-
client,
797-
parallel_requests: _,
798-
} => client
799-
.broadcast(&tx)
800-
.await
801-
.map(|()| tx.compute_txid())
802-
.map_err(|e| Error::Generic(e.to_string()))?,
803-
#[cfg(feature = "rpc")]
804-
RpcClient { client } => client
805-
.send_raw_transaction(&tx)
806-
.map_err(|e| Error::Generic(e.to_string()))?,
807-
808-
#[cfg(feature = "cbf")]
809-
KyotoClient { client } => {
810-
let LightClient {
811-
requester,
812-
mut info_subscriber,
813-
mut warning_subscriber,
814-
update_subscriber: _,
815-
node,
816-
} = *client;
817-
818-
let subscriber = tracing_subscriber::FmtSubscriber::new();
819-
tracing::subscriber::set_global_default(subscriber)
820-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;
821-
822-
tokio::task::spawn(async move { node.run().await });
823-
tokio::task::spawn(async move {
824-
select! {
825-
info = info_subscriber.recv() => {
826-
if let Some(info) = info {
827-
tracing::info!("{info}");
828-
}
829-
},
830-
warn = warning_subscriber.recv() => {
831-
if let Some(warn) = warn {
832-
tracing::warn!("{warn}");
833-
}
834-
}
835-
}
836-
});
837-
let txid = tx.compute_txid();
838-
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
839-
tracing::warn!("Broadcast was unsuccessful");
840-
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
841-
})?;
842-
tracing::info!("Successfully broadcast WTXID: {wtxid}");
843-
txid
844-
}
845-
};
786+
let txid = broadcast_transaction(client, tx).await?;
846787
Ok(serde_json::to_string_pretty(&json!({ "txid": txid }))?)
847788
}
848789
}
@@ -1321,6 +1262,94 @@ async fn respond(
13211262
}
13221263
}
13231264

1265+
#[cfg(any(
1266+
feature = "electrum",
1267+
feature = "esplora",
1268+
feature = "cbf",
1269+
feature = "rpc"
1270+
))]
1271+
/// Broadcasts a given transaction using the blockchain client.
1272+
async fn broadcast_transaction(client: BlockchainClient, tx: Transaction) -> Result<Txid, Error> {
1273+
match client {
1274+
#[cfg(feature = "electrum")]
1275+
Electrum {
1276+
client,
1277+
batch_size: _,
1278+
} => client
1279+
.transaction_broadcast(&tx)
1280+
.map_err(|e| Error::Generic(e.to_string())),
1281+
#[cfg(feature = "esplora")]
1282+
Esplora {
1283+
client,
1284+
parallel_requests: _,
1285+
} => client
1286+
.broadcast(&tx)
1287+
.await
1288+
.map(|()| tx.compute_txid())
1289+
.map_err(|e| Error::Generic(e.to_string())),
1290+
#[cfg(feature = "rpc")]
1291+
RpcClient { client } => client
1292+
.send_raw_transaction(&tx)
1293+
.map_err(|e| Error::Generic(e.to_string())),
1294+
#[cfg(feature = "cbf")]
1295+
KyotoClient { client } => {
1296+
let LightClient {
1297+
requester,
1298+
mut log_subscriber,
1299+
mut info_subscriber,
1300+
mut warning_subscriber,
1301+
update_subscriber: _,
1302+
node,
1303+
} = *client;
1304+
1305+
let subscriber = tracing_subscriber::FmtSubscriber::new();
1306+
tracing::subscriber::set_global_default(subscriber)
1307+
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
1308+
1309+
tokio::task::spawn(async move { node.run().await });
1310+
tokio::task::spawn(async move {
1311+
select! {
1312+
log = log_subscriber.recv() => {
1313+
if let Some(log) = log {
1314+
tracing::info!("{log}");
1315+
}
1316+
},
1317+
warn = warning_subscriber.recv() => {
1318+
if let Some(warn) = warn {
1319+
tracing::warn!("{warn}");
1320+
}
1321+
}
1322+
}
1323+
});
1324+
let txid = tx.compute_txid();
1325+
requester
1326+
.broadcast_random(tx.clone())
1327+
.map_err(|e| Error::Generic(format!("{}", e)))?;
1328+
tokio::time::timeout(tokio::time::Duration::from_secs(30), async move {
1329+
while let Some(info) = info_subscriber.recv().await {
1330+
match info {
1331+
Info::TxGossiped(wtxid) => {
1332+
tracing::info!("Successfully broadcast WTXID: {wtxid}");
1333+
break;
1334+
}
1335+
Info::ConnectionsMet => {
1336+
tracing::info!("Rebroadcasting to new connections");
1337+
requester.broadcast_random(tx.clone()).unwrap();
1338+
}
1339+
_ => tracing::info!("{info}"),
1340+
}
1341+
}
1342+
})
1343+
.await
1344+
.map_err(|_| {
1345+
tracing::warn!("Broadcast was unsuccessful");
1346+
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1347+
})?;
1348+
Ok(txid)
1349+
}
1350+
}
1351+
}
1352+
13241353
#[cfg(feature = "repl")]
13251354
fn readline() -> Result<String, Error> {
13261355
write!(std::io::stdout(), "> ").map_err(|e| Error::Generic(e.to_string()))?;

0 commit comments

Comments
 (0)